hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1453669 [2/2] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoo...
Date Thu, 07 Mar 2013 02:57:55 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Thu Mar  7 02:57:40 2013
@@ -475,9 +475,9 @@ public class MergeManagerImpl<K, V> impl
           combineCollector.setWriter(writer);
           combineAndSpill(rIter, reduceCombineInputCounter);
         }
-        compressAwarePath = new CompressAwarePath(outputPath,
-            writer.getRawLength());
         writer.close();
+        compressAwarePath = new CompressAwarePath(outputPath,
+            writer.getRawLength(), writer.getCompressedLength());
 
         LOG.info(reduceId +  
             " Merge of the " + noInMemorySegments +
@@ -500,7 +500,7 @@ public class MergeManagerImpl<K, V> impl
   private class OnDiskMerger extends MergeThread<CompressAwarePath,K,V> {
     
     public OnDiskMerger(MergeManagerImpl<K, V> manager) {
-      super(manager, Integer.MAX_VALUE, exceptionReporter);
+      super(manager, ioSortFactor, exceptionReporter);
       setName("OnDiskMerger - Thread to merge on-disk map-outputs");
       setDaemon(true);
     }
@@ -552,9 +552,9 @@ public class MergeManagerImpl<K, V> impl
                             mergedMapOutputsCounter, null);
 
         Merger.writeFile(iter, writer, reporter, jobConf);
-        compressAwarePath = new CompressAwarePath(outputPath,
-            writer.getRawLength());
         writer.close();
+        compressAwarePath = new CompressAwarePath(outputPath,
+            writer.getRawLength(), writer.getCompressedLength());
       } catch (IOException e) {
         localFS.delete(outputPath, true);
         throw e;
@@ -713,13 +713,15 @@ public class MergeManagerImpl<K, V> impl
             keyClass, valueClass, memDiskSegments, numMemDiskSegments,
             tmpDir, comparator, reporter, spilledRecordsCounter, null, 
             mergePhase);
-        final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
+        Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
             keyClass, valueClass, codec, null);
         try {
           Merger.writeFile(rIter, writer, reporter, job);
-          // add to list of final disk outputs.
+          writer.close();
           onDiskMapOutputs.add(new CompressAwarePath(outputPath,
-              writer.getRawLength()));
+              writer.getRawLength(), writer.getCompressedLength()));
+          writer = null;
+          // add to list of final disk outputs.
         } catch (IOException e) {
           if (null != outputPath) {
             try {
@@ -789,7 +791,7 @@ public class MergeManagerImpl<K, V> impl
       // merges. See comment where mergePhaseFinished is being set
       Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
       RawKeyValueIterator diskMerge = Merger.merge(
-          job, fs, keyClass, valueClass, diskSegments,
+          job, fs, keyClass, valueClass, codec, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
           reporter, false, spilledRecordsCounter, null, thisPhase);
       diskSegments.clear();
@@ -808,24 +810,45 @@ public class MergeManagerImpl<K, V> impl
 
   static class CompressAwarePath extends Path {
     private long rawDataLength;
+    private long compressedSize;
 
-    public CompressAwarePath(Path path, long rawDataLength) {
+    public CompressAwarePath(Path path, long rawDataLength, long compressSize) {
       super(path.toUri());
       this.rawDataLength = rawDataLength;
+      this.compressedSize = compressSize;
     }
 
     public long getRawDataLength() {
       return rawDataLength;
     }
-    
+
+    public long getCompressedSize() {
+      return compressedSize;
+    }
+
     @Override
     public boolean equals(Object other) {
       return super.equals(other);
     }
-    
+
     @Override
     public int hashCode() {
       return super.hashCode();
     }
+
+    @Override
+    public int compareTo(Object obj) {
+      if(obj instanceof CompressAwarePath) {
+        CompressAwarePath compPath = (CompressAwarePath) obj;
+        if(this.compressedSize < compPath.getCompressedSize()) {
+          return -1;
+        } else if (this.getCompressedSize() > compPath.getCompressedSize()) {
+          return 1;
+        }
+        // Not returning 0 here so that objects with the same size (but
+        // different paths) are still added to the TreeSet.
+      }
+      return super.compareTo(obj);
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java Thu Mar  7 02:57:40 2013
@@ -48,6 +48,7 @@ class OnDiskMapOutput<K, V> extends MapO
   private final Path outputPath;
   private final MergeManagerImpl<K, V> merger;
   private final OutputStream disk; 
+  private long compressedSize;
 
   public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId,
                          MergeManagerImpl<K, V> merger, long size,
@@ -108,13 +109,14 @@ class OnDiskMapOutput<K, V> extends MapO
                             bytesLeft + " bytes missing of " + 
                             compressedLength + ")");
     }
+    this.compressedSize = compressedLength;
   }
 
   @Override
   public void commit() throws IOException {
     localFS.rename(tmpOutputPath, outputPath);
     CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,
-        getSize());
+        getSize(), this.compressedSize);
     merger.closeOnDiskFile(compressAwarePath);
   }
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Thu Mar  7 02:57:40 2013
@@ -521,6 +521,8 @@ public class ConfigUtil {
     });
     Configuration.addDeprecation("mapreduce.user.classpath.first",
       MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST);
+    Configuration.addDeprecation(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE,
+        MRJobConfig.SPLIT_METAINFO_MAXSIZE);
   }
 
   public static void main(String[] args) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Mar  7 02:57:40 2013
@@ -296,6 +296,14 @@
 </property>
 
 <property>
+  <name>mapreduce.shuffle.max.connections</name>
+  <value>0</value>
+  <description>Max allowed connections for the shuffle.  Set to 0 (zero)
+               to indicate no limit on the number of connections.
+  </description>
+</property>
+
+<property>
   <name>mapreduce.reduce.markreset.buffer.percent</name>
   <value>0.0</value>
   <description>The percentage of memory -relative to the maximum heap size- to

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1449958-1453659

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java Thu Mar  7 02:57:40 2013
@@ -17,28 +17,38 @@
  */
 package org.apache.hadoop.mapreduce.task.reduce;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MROutputFiles;
 import org.apache.hadoop.mapred.MapOutputFile;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl.CompressAwarePath;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestMergeManager {
 
   @Test(timeout=10000)
-  @SuppressWarnings("unchecked")
   public void testMemoryMerge() throws Exception {
     final int TOTAL_MEM_BYTES = 10000;
     final int OUTPUT_SIZE = 7950;
@@ -195,4 +205,59 @@ public class TestMergeManager {
       return exceptions.size();
     }
   }
+
+  @SuppressWarnings({ "unchecked", "deprecation" })
+  @Test(timeout=10000)
+  public void testOnDiskMerger() throws IOException, URISyntaxException,
+    InterruptedException {
+    JobConf jobConf = new JobConf();
+    final int SORT_FACTOR = 5;
+    jobConf.setInt(MRJobConfig.IO_SORT_FACTOR, SORT_FACTOR);
+
+    MapOutputFile mapOutputFile = new MROutputFiles();
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    MergeManagerImpl<IntWritable, IntWritable> manager =
+      new MergeManagerImpl<IntWritable, IntWritable>(null, jobConf, fs, null
+        , null, null, null, null, null, null, null, null, null, mapOutputFile);
+
+    MergeThread<MapOutput<IntWritable, IntWritable>, IntWritable, IntWritable>
+      onDiskMerger = (MergeThread<MapOutput<IntWritable, IntWritable>,
+        IntWritable, IntWritable>) Whitebox.getInternalState(manager,
+          "onDiskMerger");
+    int mergeFactor = (Integer) Whitebox.getInternalState(onDiskMerger,
+      "mergeFactor");
+
+    // make sure the io.sort.factor is set properly
+    assertEquals(mergeFactor, SORT_FACTOR);
+
+    // Stop the onDiskMerger thread so that we can intercept the list of files
+    // waiting to be merged.
+    onDiskMerger.suspend();
+
+    //Send the list of fake files waiting to be merged
+    Random rand = new Random();
+    for(int i = 0; i < 2*SORT_FACTOR; ++i) {
+      Path path = new Path("somePath");
+      CompressAwarePath cap = new CompressAwarePath(path, 1l, rand.nextInt());
+      manager.closeOnDiskFile(cap);
+    }
+
+    //Check that the files pending to be merged are in sorted order.
+    LinkedList<List<CompressAwarePath>> pendingToBeMerged =
+      (LinkedList<List<CompressAwarePath>>) Whitebox.getInternalState(
+        onDiskMerger, "pendingToBeMerged");
+    assertTrue("No inputs were added to list pending to merge",
+      pendingToBeMerged.size() > 0);
+    for(int i = 0; i < pendingToBeMerged.size(); ++i) {
+      List<CompressAwarePath> inputs = pendingToBeMerged.get(i);
+      for(int j = 1; j < inputs.size(); ++j) {
+        assertTrue("Not enough / too many inputs were going to be merged",
+          inputs.size() > 0 && inputs.size() <= SORT_FACTOR);
+        assertTrue("Inputs to be merged were not sorted according to size: ",
+          inputs.get(j).getCompressedSize()
+          >= inputs.get(j-1).getCompressedSize());
+      }
+    }
+
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java Thu Mar  7 02:57:40 2013
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -107,6 +108,11 @@ public class CompletedTaskAttempt implem
   }
 
   @Override
+  public Phase getPhase() {
+    return Phase.CLEANUP;
+  }
+
+  @Override
   public TaskAttemptState getState() {
     return state;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Thu Mar  7 02:57:40 2013
@@ -404,7 +404,7 @@ public class TestJobHistoryParsing {
     }
   }
   
-  @Test
+  @Test (timeout=5000)
   public void testCountersForFailedTask() throws Exception {
     LOG.info("STARTING testCountersForFailedTask");
     try {
@@ -455,6 +455,9 @@ public class TestJobHistoryParsing {
       CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
       Assert.assertNotNull("completed task report has null counters",
           ct.getReport().getCounters());
+      //Make sure all the completedTask has counters, and the counters are not empty
+      Assert.assertTrue(ct.getReport().getCounters()
+          .getAllCounterGroups().size() > 0);
     }
     } finally {
       LOG.info("FINISHED testCountersForFailedTask");

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Thu Mar  7 02:57:40 2013
@@ -106,8 +106,9 @@ public class ResourceMgrDelegate extends
 
   public QueueInfo getQueue(String queueName) throws IOException,
   InterruptedException {
-    return TypeConverter.fromYarn(
-        super.getQueueInfo(queueName), this.conf);
+    org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
+        super.getQueueInfo(queueName);
+    return (queueInfo == null) ? null : TypeConverter.fromYarn(queueInfo, conf);
   }
 
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java Thu Mar  7 02:57:40 2013
@@ -45,7 +45,7 @@ public class MiniMRClientClusterFactory 
 
     FileSystem fs = FileSystem.get(conf);
 
-    Path testRootDir = new Path("target", caller.getName() + "-tmpDir")
+    Path testRootDir = new Path("target", caller.getSimpleName() + "-tmpDir")
         .makeQualified(fs);
     Path appJar = new Path(testRootDir, "MRAppJar.jar");
 
@@ -66,9 +66,9 @@ public class MiniMRClientClusterFactory 
     job.addFileToClassPath(remoteCallerJar);
 
     MiniMRYarnCluster miniMRYarnCluster = new MiniMRYarnCluster(caller
-        .getName(), noOfNMs);
+        .getSimpleName(), noOfNMs);
     job.getConfiguration().set("minimrclientcluster.caller.name",
-        caller.getName());
+        caller.getSimpleName());
     job.getConfiguration().setInt("minimrclientcluster.nodemanagers.number",
         noOfNMs);
     miniMRYarnCluster.init(job.getConfiguration());

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java Thu Mar  7 02:57:40 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ClassUtil;
 
 
 import static org.junit.Assert.*;
@@ -79,7 +80,7 @@ public class TestJobConf {
     Class clazz = Class.forName(CLASSNAME, true, cl);
     assertNotNull(clazz);
 
-    String containingJar = JobConf.findContainingJar(clazz);
+    String containingJar = ClassUtil.findContainingJar(clazz);
     assertEquals(jar.getAbsolutePath(), containingJar);
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java Thu Mar  7 02:57:40 2013
@@ -61,8 +61,12 @@ import org.apache.hadoop.util.Reflection
  */
 public class TestMapProgress extends TestCase {
   public static final Log LOG = LogFactory.getLog(TestMapProgress.class);
-  private static String TEST_ROOT_DIR = new File(System.getProperty(
-           "test.build.data", "/tmp")).getAbsolutePath() + "/mapPahseprogress";
+  private static String TEST_ROOT_DIR;
+  static {
+    String root = new File(System.getProperty("test.build.data", "/tmp"))
+      .getAbsolutePath();
+    TEST_ROOT_DIR = new Path(root, "mapPhaseprogress").toString();
+  }
 
   static class FakeUmbilical implements TaskUmbilicalProtocol {
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Thu Mar  7 02:57:40 2013
@@ -20,11 +20,14 @@ package org.apache.hadoop.mapreduce.lib.
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 import java.util.concurrent.TimeoutException;
 
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.*;
@@ -42,9 +45,13 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.junit.Test;
 
+import com.google.common.collect.HashMultiset;
+
 public class TestCombineFileInputFormat extends TestCase {
 
   private static final String rack1[] = new String[] {
@@ -476,23 +483,23 @@ public class TestCombineFileInputFormat 
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
       assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(0));
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(2);
       assertEquals(2, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(0));
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(file3.getName(), fileSplit.getPath(1).getName());
       assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+      assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
 
       // maximum split size is 3 blocks 
       inFormat = new DummyInputFormat();
@@ -504,7 +511,7 @@ public class TestCombineFileInputFormat 
       for (InputSplit split : splits) {
         System.out.println("File split(Test5): " + split);
       }
-      assertEquals(4, splits.size());
+      assertEquals(3, splits.size());
       fileSplit = (CombineFileSplit) splits.get(0);
       assertEquals(3, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
@@ -519,32 +526,28 @@ public class TestCombineFileInputFormat 
       assertEquals(BLOCKSIZE, fileSplit.getLength(2));
       assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
       assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
       assertEquals(file4.getName(), fileSplit.getPath(2).getName());
-      assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(2));
+      assertEquals(0, fileSplit.getOffset(2));
       assertEquals(BLOCKSIZE, fileSplit.getLength(2));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(3, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
       assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(3);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(2).getName());
+      assertEquals(2*BLOCKSIZE, fileSplit.getOffset(2));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(2));
       assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
 
       // maximum split size is 4 blocks 
@@ -713,6 +716,56 @@ public class TestCombineFileInputFormat 
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
   
+  public void testNodeInputSplit() throws IOException, InterruptedException {
+    // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on 
+    // both nodes. The grouping ensures that both nodes get splits instead of 
+    // just the first node
+    DummyInputFormat inFormat = new DummyInputFormat();
+    int numBlocks = 12;
+    long totLength = 0;
+    long blockSize = 100;
+    long maxSize = 200;
+    long minSizeNode = 50;
+    long minSizeRack = 50;
+    String[] locations = { "h1", "h2" };
+    String[] racks = new String[0];
+    Path path = new Path("hdfs://file");
+    
+    OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
+    for(int i=0; i<numBlocks; ++i) {
+      blocks[i] = new OneBlockInfo(path, i*blockSize, blockSize, locations, racks);
+      totLength += blockSize;
+    }
+    
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    HashMap<String, Set<String>> rackToNodes = 
+                              new HashMap<String, Set<String>>();
+    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+    HashMap<OneBlockInfo, String[]> blockToNodes = 
+                              new HashMap<OneBlockInfo, String[]>();
+    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+    
+    OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
+                             nodeToBlocks, rackToNodes);
+    
+    inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,  
+                          maxSize, minSizeNode, minSizeRack, splits);
+    
+    int expectedSplitCount = (int)(totLength/maxSize);
+    Assert.assertEquals(expectedSplitCount, splits.size());
+    HashMultiset<String> nodeSplits = HashMultiset.create();
+    for(int i=0; i<expectedSplitCount; ++i) {
+      InputSplit inSplit = splits.get(i);
+      Assert.assertEquals(maxSize, inSplit.getLength());
+      Assert.assertEquals(1, inSplit.getLocations().length);
+      nodeSplits.add(inSplit.getLocations()[0]);
+    }
+    Assert.assertEquals(3, nodeSplits.count(locations[0]));
+    Assert.assertEquals(3, nodeSplits.count(locations[1]));
+  }
+  
   public void testSplitPlacementForCompressedFiles() throws Exception {
     MiniDFSCluster dfs = null;
     FileSystem fileSys = null;
@@ -889,24 +942,24 @@ public class TestCombineFileInputFormat 
       assertEquals(f3.getLen(), fileSplit.getLength(0));
       assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
       fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f4.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r3
       fileSplit = (CombineFileSplit) splits.get(2);
       assertEquals(1, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r2
       fileSplit = (CombineFileSplit) splits.get(3);
       assertEquals(1, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+      assertEquals(f4.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r1
 
       // maximum split size is twice file1's length
       inFormat = new DummyInputFormat();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java Thu Mar  7 02:57:40 2013
@@ -25,6 +25,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.jar.JarOutputStream;
 import java.util.zip.ZipEntry;
 import org.apache.commons.io.FileUtils;
@@ -103,6 +105,8 @@ public class TestMRJobs {
   private static Path TEST_ROOT_DIR = new Path("target",
       TestMRJobs.class.getName() + "-tmpDir").makeQualified(localFs);
   static Path APP_JAR = new Path(TEST_ROOT_DIR, "MRAppJar.jar");
+  private static final String OUTPUT_ROOT_DIR = "/tmp/" +
+    TestMRJobs.class.getSimpleName();
 
   @BeforeClass
   public static void setup() throws IOException {
@@ -140,7 +144,7 @@ public class TestMRJobs {
     }
   }
 
-  @Test
+  @Test (timeout = 300000)
   public void testSleepJob() throws IOException, InterruptedException,
       ClassNotFoundException { 
     LOG.info("\n\n\nStarting testSleepJob().");
@@ -211,7 +215,7 @@ public class TestMRJobs {
     }
   }
 
-  @Test
+  @Test (timeout = 30000)
   public void testRandomWriter() throws IOException, InterruptedException,
       ClassNotFoundException {
     
@@ -226,8 +230,7 @@ public class TestMRJobs {
     mrCluster.getConfig().set(RandomTextWriterJob.TOTAL_BYTES, "3072");
     mrCluster.getConfig().set(RandomTextWriterJob.BYTES_PER_MAP, "1024");
     Job job = randomWriterJob.createJob(mrCluster.getConfig());
-    Path outputDir =
-        new Path(mrCluster.getTestWorkDir().getAbsolutePath(), "random-output");
+    Path outputDir = new Path(OUTPUT_ROOT_DIR, "random-output");
     FileOutputFormat.setOutputPath(job, outputDir);
     job.setSpeculativeExecution(false);
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
@@ -274,7 +277,7 @@ public class TestMRJobs {
             && counters.findCounter(JobCounter.SLOTS_MILLIS_MAPS).getValue() != 0);
   }
 
-  @Test
+  @Test (timeout = 30000)
   public void testFailingMapper() throws IOException, InterruptedException,
       ClassNotFoundException {
 
@@ -342,9 +345,8 @@ public class TestMRJobs {
     job.setMapperClass(FailingMapper.class);
     job.setNumReduceTasks(0);
     
-    FileOutputFormat.setOutputPath(job,
-        new Path(mrCluster.getTestWorkDir().getAbsolutePath(),
-        "failmapper-output"));
+    FileOutputFormat.setOutputPath(job, new Path(OUTPUT_ROOT_DIR,
+      "failmapper-output"));
     job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
     job.submit();
     String trackingUrl = job.getTrackingURL();
@@ -357,7 +359,7 @@ public class TestMRJobs {
     return job;
   }
 
-  //@Test
+  //@Test (timeout = 30000)
   public void testSleepJobWithSecurityOn() throws IOException,
       InterruptedException, ClassNotFoundException {
 
@@ -425,14 +427,22 @@ public class TestMRJobs {
       Assert.assertEquals(2, archives.length);
 
       // Check lengths of the files
-      Assert.assertEquals(1, localFs.getFileStatus(files[1]).getLen());
-      Assert.assertTrue(localFs.getFileStatus(files[2]).getLen() > 1);
+      Map<String, Path> filesMap = pathsToMap(files);
+      Assert.assertTrue(filesMap.containsKey("distributed.first.symlink"));
+      Assert.assertEquals(1, localFs.getFileStatus(
+        filesMap.get("distributed.first.symlink")).getLen());
+      Assert.assertTrue(filesMap.containsKey("distributed.second.jar"));
+      Assert.assertTrue(localFs.getFileStatus(
+        filesMap.get("distributed.second.jar")).getLen() > 1);
 
       // Check extraction of the archive
-      Assert.assertTrue(localFs.exists(new Path(archives[0],
-          "distributed.jar.inside3")));
-      Assert.assertTrue(localFs.exists(new Path(archives[1],
-          "distributed.jar.inside4")));
+      Map<String, Path> archivesMap = pathsToMap(archives);
+      Assert.assertTrue(archivesMap.containsKey("distributed.third.jar"));
+      Assert.assertTrue(localFs.exists(new Path(
+        archivesMap.get("distributed.third.jar"), "distributed.jar.inside3")));
+      Assert.assertTrue(archivesMap.containsKey("distributed.fourth.jar"));
+      Assert.assertTrue(localFs.exists(new Path(
+        archivesMap.get("distributed.fourth.jar"), "distributed.jar.inside4")));
 
       // Check the class loaders
       LOG.info("Java Classpath: " + System.getProperty("java.class.path"));
@@ -460,6 +470,23 @@ public class TestMRJobs {
       Assert.assertTrue(FileUtils.isSymlink(jobJarDir));
       Assert.assertTrue(jobJarDir.isDirectory());
     }
+
+    /**
+     * Returns a mapping of the final component of each path to the corresponding
+     * Path instance.  This assumes that every given Path has a unique string in
+     * the final path component, which is true for these tests.
+     * 
+     * @param paths Path[] to map
+     * @return Map<String, Path> mapping the final component of each path to the
+     *   corresponding Path instance
+     */
+    private static Map<String, Path> pathsToMap(Path[] paths) {
+      Map<String, Path> map = new HashMap<String, Path>();
+      for (Path path: paths) {
+        map.put(path.getName(), path);
+      }
+      return map;
+    }
   }
 
   public void _testDistributedCache(String jobJarPath) throws Exception {
@@ -515,7 +542,7 @@ public class TestMRJobs {
           trackingUrl.endsWith(jobId.substring(jobId.lastIndexOf("_")) + "/"));
   }
   
-  @Test
+  @Test (timeout = 300000)
   public void testDistributedCache() throws Exception {
     // Test with a local (file:///) Job Jar
     Path localJobJarPath = makeJobJarWithLib(TEST_ROOT_DIR.toUri().toString());

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java Thu Mar  7 02:57:40 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.List;
 
 import junit.framework.Assert;
@@ -58,6 +59,9 @@ public class TestMRJobsWithHistoryServic
   private static final Log LOG =
     LogFactory.getLog(TestMRJobsWithHistoryService.class);
 
+  private static final EnumSet<RMAppState> TERMINAL_RM_APP_STATES =
+    EnumSet.of(RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED);
+
   private static MiniMRYarnCluster mrCluster;
 
   private static Configuration conf = new Configuration();
@@ -108,7 +112,7 @@ public class TestMRJobsWithHistoryServic
     }
   }
 
-  @Test
+  @Test (timeout = 30000)
   public void testJobHistoryData() throws IOException, InterruptedException,
       AvroRemoteException, ClassNotFoundException {
     if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
@@ -129,12 +133,24 @@ public class TestMRJobsWithHistoryServic
     Counters counterMR = job.getCounters();
     JobId jobId = TypeConverter.toYarn(job.getJobID());
     ApplicationId appID = jobId.getAppId();
+    int pollElapsed = 0;
     while (true) {
       Thread.sleep(1000);
-      if (mrCluster.getResourceManager().getRMContext().getRMApps()
-          .get(appID).getState().equals(RMAppState.FINISHED))
+      pollElapsed += 1000;
+
+      if (TERMINAL_RM_APP_STATES.contains(
+          mrCluster.getResourceManager().getRMContext().getRMApps().get(appID)
+          .getState())) {
+        break;
+      }
+
+      if (pollElapsed >= 60000) {
+        LOG.warn("application did not reach terminal state within 60 seconds");
         break;
+      }
     }
+    Assert.assertEquals(RMAppState.FINISHED, mrCluster.getResourceManager()
+      .getRMContext().getRMApps().get(appID).getState());
     Counters counterHS = job.getCounters();
     //TODO the Assert below worked. need to check
     //Should we compare each field or convert to V2 counter and compare

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/testshell/ExternalMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/testshell/ExternalMapReduce.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/testshell/ExternalMapReduce.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/testshell/ExternalMapReduce.java Thu Mar  7 02:57:40 2013
@@ -72,7 +72,7 @@ public class ExternalMapReduce extends C
       }
       //fork off ls to see if the file exists.
       // java file.exists() will not work on 
-      // cygwin since it is a symlink
+      // Windows since it is a symlink
       String[] argv = new String[7];
       argv[0] = "ls";
       argv[1] = "files_tmp";

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java Thu Mar  7 02:57:40 2013
@@ -69,8 +69,10 @@ public class FadvisedChunkedFile extends
     }
     if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
       try {
-        NativeIO.posixFadviseIfPossible(fd, getStartOffset(), getEndOffset()
-            - getStartOffset(), NativeIO.POSIX_FADV_DONTNEED);
+        NativeIO.POSIX.posixFadviseIfPossible(
+            fd,
+            getStartOffset(), getEndOffset() - getStartOffset(),
+            NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Throwable t) {
         LOG.warn("Failed to manage OS cache for " + identifier, t);
       }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java Thu Mar  7 02:57:40 2013
@@ -71,8 +71,9 @@ public class FadvisedFileRegion extends 
     }
     if (manageOsCache && getCount() > 0) {
       try {
-        NativeIO.posixFadviseIfPossible(fd, getPosition(), getCount(),
-            NativeIO.POSIX_FADV_DONTNEED);
+        NativeIO.POSIX.posixFadviseIfPossible(
+           fd, getPosition(), getCount(),
+           NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Throwable t) {
         LOG.warn("Failed to manage OS cache for " + identifier, t);
       }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Thu Mar  7 02:57:40 2013
@@ -88,6 +88,7 @@ import org.jboss.netty.channel.ChannelFu
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
@@ -121,7 +122,7 @@ public class ShuffleHandler extends Abst
 
   public static final String SHUFFLE_READAHEAD_BYTES = "mapreduce.shuffle.readahead.bytes";
   public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
+  
   // pattern to identify errors related to the client closing the socket early
   // idea borrowed from Netty SslHandler
   private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
@@ -133,15 +134,15 @@ public class ShuffleHandler extends Abst
   private final ChannelGroup accepted = new DefaultChannelGroup();
   protected HttpPipelineFactory pipelineFact;
   private int sslFileBufferSize;
-
+  
   /**
    * Should the shuffle use posix_fadvise calls to manage the OS cache during
    * sendfile
    */
   private boolean manageOsCache;
   private int readaheadLength;
+  private int maxShuffleConnections;
   private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
-   
 
   public static final String MAPREDUCE_SHUFFLE_SERVICEID =
       "mapreduce.shuffle";
@@ -159,6 +160,9 @@ public class ShuffleHandler extends Abst
 
   public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
 
+  public static final String MAX_SHUFFLE_CONNECTIONS = "mapreduce.shuffle.max.connections";
+  public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0; // 0 implies no limit
+
   @Metrics(about="Shuffle output metrics", context="mapred")
   static class ShuffleMetrics implements ChannelFutureListener {
     @Metric("Shuffle output in bytes")
@@ -270,6 +274,9 @@ public class ShuffleHandler extends Abst
     readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
         DEFAULT_SHUFFLE_READAHEAD_BYTES);
     
+    maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS, 
+                                        DEFAULT_MAX_SHUFFLE_CONNECTIONS);
+
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
       .setNameFormat("ShuffleHandler Netty Boss #%d")
       .build();
@@ -400,6 +407,21 @@ public class ShuffleHandler extends Abst
     }
 
     @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) 
+        throws Exception {
+      if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
+        LOG.info(String.format("Current number of shuffle connections (%d) is " + 
+            "greater than or equal to the max allowed shuffle connections (%d)", 
+            accepted.size(), maxShuffleConnections));
+        evt.getChannel().close();
+        return;
+      }
+      accepted.add(evt.getChannel());
+      super.channelOpen(ctx, evt);
+     
+    }
+
+    @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
         throws Exception {
       HttpRequest request = (HttpRequest) evt.getMessage();
@@ -620,6 +642,5 @@ public class ShuffleHandler extends Abst
         sendError(ctx, INTERNAL_SERVER_ERROR);
       }
     }
-
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Thu Mar  7 02:57:40 2013
@@ -24,13 +24,15 @@ import static org.apache.hadoop.test.Moc
 import static org.apache.hadoop.test.MockitoMaker.stub;
 import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
 import static org.junit.Assert.assertEquals;
-
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.HttpURLConnection;
+import java.net.SocketException;
 import java.net.URL;
 import java.util.ArrayList;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -47,10 +49,13 @@ import org.jboss.netty.handler.codec.htt
 import org.junit.Assert;
 import org.junit.Test;
 
+
 public class TestShuffleHandler {
-  static final long MiB = 1024 * 1024;
+  static final long MiB = 1024 * 1024; 
+  private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
 
-  @Test public void testSerializeMeta()  throws Exception {
+  @Test (timeout = 10000)
+  public void testSerializeMeta()  throws Exception {
     assertEquals(1, ShuffleHandler.deserializeMetaData(
         ShuffleHandler.serializeMetaData(1)));
     assertEquals(-1, ShuffleHandler.deserializeMetaData(
@@ -59,7 +64,8 @@ public class TestShuffleHandler {
         ShuffleHandler.serializeMetaData(8080)));
   }
 
-  @Test public void testShuffleMetrics() throws Exception {
+  @Test (timeout = 10000)
+  public void testShuffleMetrics() throws Exception {
     MetricsSystem ms = new MetricsSystemImpl();
     ShuffleHandler sh = new ShuffleHandler(ms);
     ChannelFuture cf = make(stub(ChannelFuture.class).
@@ -88,7 +94,7 @@ public class TestShuffleHandler {
     assertGauge("ShuffleConnections", connections, rb);
   }
 
-  @Test
+  @Test (timeout = 10000)
   public void testClientClosesConnection() throws Exception {
     final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
     Configuration conf = new Configuration();
@@ -159,4 +165,84 @@ public class TestShuffleHandler {
     Assert.assertTrue("sendError called when client closed connection",
         failures.size() == 0);
   }
+  
+  @Test (timeout = 10000)
+  public void testMaxConnections() throws Exception {
+    
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    ShuffleHandler shuffleHandler = new ShuffleHandler() {
+      @Override
+      protected Shuffle getShuffle(Configuration conf) {
+        // replace the shuffle handler with one stubbed for testing
+        return new Shuffle(conf) {
+          @Override
+          protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+              HttpRequest request, HttpResponse response, URL requestUri)
+                  throws IOException {
+          }
+          @Override
+          protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+              Channel ch, String user, String jobId, String mapId, int reduce)
+                  throws IOException {
+            // send a shuffle header and a lot of data down the channel
+            // to trigger a broken pipe
+            ShuffleHeader header =
+                new ShuffleHeader("dummy_header", 5678, 5678, 1);
+            DataOutputBuffer dob = new DataOutputBuffer();
+            header.write(dob);
+            ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+            dob = new DataOutputBuffer();
+            for (int i=0; i<100000; ++i) {
+              header.write(dob);
+            }
+            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+          }
+        };
+      }
+    };
+    shuffleHandler.init(conf);
+    shuffleHandler.start();
+
+    // setup connections
+    int connAttempts = 3;
+    HttpURLConnection conns[] = new HttpURLConnection[connAttempts];
+
+    for (int i = 0; i < connAttempts; i++) {
+      String URLstring = "http://127.0.0.1:" 
+           + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+           + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_"
+           + i + "_0";
+      URL url = new URL(URLstring);
+      conns[i] = (HttpURLConnection)url.openConnection();
+    }
+
+    // Try to open numerous connections
+    for (int i = 0; i < connAttempts; i++) {
+      conns[i].connect();
+    }
+
+    //Ensure first connections are okay
+    conns[0].getInputStream();
+    int rc = conns[0].getResponseCode();
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+    
+    conns[1].getInputStream();
+    rc = conns[1].getResponseCode();
+    Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
+
+    // This connection should be closed because it to above the limit
+    try {
+      conns[2].getInputStream();
+      rc = conns[2].getResponseCode();
+      Assert.fail("Expected a SocketException");
+    } catch (SocketException se) {
+      LOG.info("Expected - connection should not be open");
+    } catch (Exception e) {
+      Assert.fail("Expected a SocketException");
+    }
+    
+    shuffleHandler.stop(); 
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml Thu Mar  7 02:57:40 2013
@@ -130,7 +130,7 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.jboss.netty</groupId>
+      <groupId>io.netty</groupId>
       <artifactId>netty</artifactId>
     </dependency>
     <dependency>
@@ -176,6 +176,10 @@
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <configuration>
+         <environmentVariables>
+           <!-- HADOOP_HOME required for tests on Windows to find winutils -->
+           <HADOOP_HOME>${basedir}/../../../hadoop-common-project/hadoop-common/target</HADOOP_HOME>
+         </environmentVariables>
          <properties>
            <property>
              <name>listener</name>

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/pom.xml?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/pom.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/pom.xml Thu Mar  7 02:57:40 2013
@@ -61,7 +61,7 @@
           <artifactId>ant</artifactId>
         </exclusion>
         <exclusion>
-          <groupId>org.jboss.netty</groupId>
+          <groupId>io.netty</groupId>
           <artifactId>netty</artifactId>
         </exclusion>
         <exclusion>
@@ -151,7 +151,7 @@
       <artifactId>junit</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.jboss.netty</groupId>
+      <groupId>io.netty</groupId>
       <artifactId>netty</artifactId>
     </dependency>
     <dependency>
@@ -182,15 +182,8 @@
               <target if="tar">
                 <!-- Using Unix script to preserve symlinks -->
                 <echo file="${project.build.directory}/dist-maketar.sh">
-
-                  which cygpath 2&gt; /dev/null
-                  if [ $? = 1 ]; then
-                    BUILD_DIR="${project.build.directory}"
-                  else
-                    BUILD_DIR=`cygpath --unix '${project.build.directory}'`
-                  fi
-                  cd $BUILD_DIR
-                  tar czf ${project.artifactId}-${project.version}.tar.gz ${project.artifactId}-${project.version}
+                  cd "${project.build.directory}"
+                  tar cf - ${project.artifactId}-${project.version} | gzip > ${project.artifactId}-${project.version}.tar.gz
                 </echo>
                 <exec executable="sh" dir="${project.build.directory}" failonerror="true">
                   <arg line="./dist-maketar.sh"/>



Mime
View raw message