hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject git commit: MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting for maps. Contributed by Siqi Li (cherry picked from commit b9edad64034a9c8a121ec2b37792c190ba561e26)
Date Wed, 15 Oct 2014 16:04:01 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 eb055eadd -> 9203ea7ea


MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting for maps. Contributed
by Siqi Li
(cherry picked from commit b9edad64034a9c8a121ec2b37792c190ba561e26)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9203ea7e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9203ea7e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9203ea7e

Branch: refs/heads/branch-2.6
Commit: 9203ea7ea94e274e5d0f01817ea1b9b1e56875b9
Parents: eb055ea
Author: Jason Lowe <jlowe@apache.org>
Authored: Wed Oct 15 15:52:53 2014 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Wed Oct 15 15:59:56 2014 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../hadoop/mapreduce/task/reduce/Fetcher.java   |   2 +-
 .../mapreduce/task/reduce/LocalFetcher.java     |   2 +-
 .../task/reduce/ShuffleSchedulerImpl.java       | 107 +++++++++++--
 .../task/reduce/TestShuffleScheduler.java       | 157 +++++++++++++++++++
 5 files changed, 257 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203ea7e/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 0134c06..a9aaab8 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -195,6 +195,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-6115. TestPipeApplication#testSubmitter fails in trunk (Binglin
     Chang via jlowe)
 
+    MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting
+    for maps (Siqi Li via jlowe)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203ea7e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index a416200..796394f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -544,7 +544,7 @@ class Fetcher<K,V> extends Thread {
       retryStartTime = 0;
       
       scheduler.copySucceeded(mapId, host, compressedLength, 
-                              endTime - startTime, mapOutput);
+                              startTime, endTime, mapOutput);
       // Note successful shuffle
       remaining.remove(mapId);
       metrics.successFetch();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203ea7e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
index 98256c2..6794c99 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.java
@@ -162,7 +162,7 @@ class LocalFetcher<K,V> extends Fetcher<K, V> {
       }
     }
 
-    scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0,
+    scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
         mapOutput);
     return true; // successful fetch.
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203ea7e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
index e48a73a..985a1e1 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java
@@ -23,6 +23,7 @@ import java.net.URI;
 import java.net.UnknownHostException;
 import java.text.DecimalFormat;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -64,7 +65,8 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V>
{
   private static final long INITIAL_PENALTY = 10000;
   private static final float PENALTY_GROWTH_RATE = 1.3f;
   private final static int REPORT_FAILURE_LIMIT = 10;
-
+  private static final float BYTES_PER_MILLIS_TO_MBS = 1000f / 1024 / 1024;
+  
   private final boolean[] finishedMaps;
 
   private final int totalMaps;
@@ -92,6 +94,8 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V>
{
   private final long startTime;
   private long lastProgressTime;
 
+  private final CopyTimeTracker copyTimeTracker;
+
   private volatile int maxMapRuntime = 0;
   private final int maxFailedUniqueFetches;
   private final int maxFetchFailuresBeforeReporting;
@@ -112,7 +116,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V>
{
                           Counters.Counter failedShuffleCounter) {
     totalMaps = job.getNumMapTasks();
     abortFailureLimit = Math.max(30, totalMaps / 10);
-
+    copyTimeTracker = new CopyTimeTracker();
     remainingMaps = totalMaps;
     finishedMaps = new boolean[remainingMaps];
     this.reporter = reporter;
@@ -180,7 +184,8 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V>
{
   public synchronized void copySucceeded(TaskAttemptID mapId,
                                          MapHost host,
                                          long bytes,
-                                         long millis,
+                                         long startMillis,
+                                         long endMillis,
                                          MapOutput<K,V> output
                                          ) throws IOException {
     failureCounts.remove(mapId);
@@ -195,29 +200,48 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V>
{
         notifyAll();
       }
 
-      // update the status
+      // update single copy task status
+      long copyMillis = (endMillis - startMillis);
+      if (copyMillis == 0) copyMillis = 1;
+      float bytesPerMillis = (float) bytes / copyMillis;
+      float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
+      String individualProgress = "copy task(" + mapId + " succeeded"
+          + " at " + mbpsFormat.format(transferRate) + " MB/s)";
+      // update the aggregated status
+      copyTimeTracker.add(startMillis, endMillis);
+
       totalBytesShuffledTillNow += bytes;
-      updateStatus();
+      updateStatus(individualProgress);
       reduceShuffleBytes.increment(bytes);
       lastProgressTime = Time.monotonicNow();
       LOG.debug("map " + mapId + " done " + status.getStateString());
     }
   }
 
-  private void updateStatus() {
-    float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
+  private synchronized void updateStatus(String individualProgress) {
     int mapsDone = totalMaps - remainingMaps;
-    long secsSinceStart = (Time.monotonicNow() - startTime) / 1000 + 1;
-
-    float transferRate = mbs / secsSinceStart;
+    long totalCopyMillis = copyTimeTracker.getCopyMillis();
+    if (totalCopyMillis == 0) totalCopyMillis = 1;
+    float bytesPerMillis = (float) totalBytesShuffledTillNow / totalCopyMillis;
+    float transferRate = bytesPerMillis * BYTES_PER_MILLIS_TO_MBS;
     progress.set((float) mapsDone / totalMaps);
     String statusString = mapsDone + " / " + totalMaps + " copied.";
     status.setStateString(statusString);
 
-    progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
-        + mbpsFormat.format(transferRate) + " MB/s)");
+    if (individualProgress != null) {
+      progress.setStatus(individualProgress + " Aggregated copy rate(" + 
+          mapsDone + " of " + totalMaps + " at " + 
+      mbpsFormat.format(transferRate) + " MB/s)");
+    } else {
+      progress.setStatus("copy(" + mapsDone + " of " + totalMaps + " at "
+          + mbpsFormat.format(transferRate) + " MB/s)");
+    }
   }
   
+  private void updateStatus() {
+    updateStatus(null);	
+  }
+
   public synchronized void hostFailed(String hostname) {
     if (hostFailures.containsKey(hostname)) {
       IntWritable x = hostFailures.get(hostname);
@@ -520,4 +544,63 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V>
{
   public int getMaxHostFailures() {
     return maxHostFailures;
   }
+
+  private static class CopyTimeTracker {
+    List<Interval> intervals;
+    long copyMillis;
+    public CopyTimeTracker() {
+      intervals = Collections.emptyList();
+      copyMillis = 0;
+    }
+    public void add(long s, long e) {
+      Interval interval = new Interval(s, e);
+      copyMillis = getTotalCopyMillis(interval);
+    }
+  
+    public long getCopyMillis() {
+      return copyMillis;
+    }
+    // This method captures the time during which any copy was in progress 
+    // each copy time period is record in the Interval list
+    private long getTotalCopyMillis(Interval newInterval) {
+      if (newInterval == null) {
+        return copyMillis;
+      }
+      List<Interval> result = new ArrayList<Interval>(intervals.size() + 1);
+      for (Interval interval: intervals) {
+        if (interval.end < newInterval.start) {
+          result.add(interval);
+        } else if (interval.start > newInterval.end) {
+          result.add(newInterval);
+          newInterval = interval;        
+        } else {
+          newInterval = new Interval(
+              Math.min(interval.start, newInterval.start),
+              Math.max(newInterval.end, interval.end));
+        }
+      }
+      result.add(newInterval);
+      intervals = result;
+      
+      //compute total millis
+      long length = 0;
+      for (Interval interval : intervals) {
+        length += interval.getIntervalLength();
+      }
+      return length;
+    }
+    
+    private static class Interval {
+      final long start;
+      final long end;
+      public Interval(long s, long e) {
+        start = s;
+        end = e;
+      }
+      
+      public long getIntervalLength() {
+        return end - start;
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9203ea7e/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
index 355a419..905fd44 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
@@ -17,9 +17,20 @@
  */
 package org.apache.hadoop.mapreduce.task.reduce;
 
+import static org.mockito.Mockito.mock;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
+import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.Task.CombineOutputCollector;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -66,4 +77,150 @@ public class TestShuffleScheduler {
         0.0f);
     Assert.assertTrue(scheduler.waitUntilDone(1));
   }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
+  public <K, V> void TestAggregatedTransferRate() throws Exception {
+    JobConf job = new JobConf();
+    job.setNumMapTasks(10);
+    //mock creation
+    TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
+    Reporter mockReporter = mock(Reporter.class);
+    FileSystem mockFileSystem = mock(FileSystem.class);
+    Class<? extends org.apache.hadoop.mapred.Reducer>  combinerClass = job.getCombinerClass();
+    @SuppressWarnings("unchecked")  // needed for mock with generic
+    CombineOutputCollector<K, V>  mockCombineOutputCollector =
+        (CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
+    org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
+        mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
+    LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
+    CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
+    Counter mockCounter = mock(Counter.class);
+    TaskStatus mockTaskStatus = mock(TaskStatus.class);
+    Progress mockProgress = mock(Progress.class);
+    MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
+    Task mockTask = mock(Task.class);
+    @SuppressWarnings("unchecked")
+    MapOutput<K, V> output = mock(MapOutput.class);
+     
+    ShuffleConsumerPlugin.Context<K, V> context =
+        new ShuffleConsumerPlugin.Context<K, V>(mockTaskAttemptID, job, mockFileSystem,
+                                                     mockUmbilical, mockLocalDirAllocator,
+                                                     mockReporter, mockCompressionCodec,
+                                                     combinerClass, mockCombineOutputCollector,
+                                                     mockCounter, mockCounter, mockCounter,
+                                                     mockCounter, mockCounter, mockCounter,
+                                                     mockTaskStatus, mockProgress, mockProgress,
+                                                     mockTask, mockMapOutputFile, null);
+    TaskStatus status = new TaskStatus() {
+      @Override
+      public boolean getIsMap() {
+        return false;
+      }
+      @Override
+      public void addFetchFailedMap(TaskAttemptID mapTaskId) {
+      }
+    };
+    Progress progress = new Progress();
+    ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
status, null,
+        null, progress, context.getShuffledMapsCounter(),
+        context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
+    TaskAttemptID attemptID0 = new TaskAttemptID( 
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 0), 0);
+     
+    //adding the 1st interval, 40MB from 60s to 100s
+    long bytes = (long)40 * 1024 * 1024;
+    scheduler.copySucceeded(attemptID0, new MapHost(null, null), bytes, 60000, 100000, output);
+    Assert.assertEquals("copy task(attempt_test_0000_m_000000_0 succeeded at 1.00 MB/s)"
+        + " Aggregated copy rate(1 of 10 at 1.00 MB/s)", progress.toString());
+     
+    TaskAttemptID attemptID1 = new TaskAttemptID( 
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 1), 1);
+     
+    //adding the 2nd interval before the 1st interval, 50MB from 0s to 50s
+    bytes = (long)50 * 1024 * 1024;
+    scheduler.copySucceeded(attemptID1, new MapHost(null, null), bytes, 0, 50000, output);
+    Assert.assertEquals("copy task(attempt_test_0000_m_000001_1 succeeded at 1.00 MB/s)"
+        + " Aggregated copy rate(2 of 10 at 1.00 MB/s)", progress.toString());
+     
+    TaskAttemptID attemptID2 = new TaskAttemptID( 
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 2), 2);
+         
+    //adding the 3rd interval overlapping with the 1st and the 2nd interval
+    //110MB from 25s to 80s
+    bytes = (long)110 * 1024 * 1024;
+    scheduler.copySucceeded(attemptID2, new MapHost(null, null), bytes, 25000, 80000, output);
+    Assert.assertEquals("copy task(attempt_test_0000_m_000002_2 succeeded at 2.00 MB/s)"
+        + " Aggregated copy rate(3 of 10 at 2.00 MB/s)", progress.toString());
+     
+    TaskAttemptID attemptID3 = new TaskAttemptID( 
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 3), 3);
+         
+    //adding the 4th interval just after the 2nd interval, 100MB from 100s to 300s
+    bytes = (long)100 * 1024 * 1024;
+    scheduler.copySucceeded(attemptID3, new MapHost(null, null), bytes, 100000, 300000, output);
+    Assert.assertEquals("copy task(attempt_test_0000_m_000003_3 succeeded at 0.50 MB/s)"
+        + " Aggregated copy rate(4 of 10 at 1.00 MB/s)", progress.toString());
+     
+    TaskAttemptID attemptID4 = new TaskAttemptID( 
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 4), 4);
+         
+    //adding the 5th interval between after 4th, 50MB from 350s to 400s
+    bytes = (long)50 * 1024 * 1024;
+    scheduler.copySucceeded(attemptID4, new MapHost(null, null), bytes, 350000, 400000, output);
+    Assert.assertEquals("copy task(attempt_test_0000_m_000004_4 succeeded at 1.00 MB/s)"
+        + " Aggregated copy rate(5 of 10 at 1.00 MB/s)", progress.toString());
+
+
+    TaskAttemptID attemptID5 = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 5), 5);
+    //adding the 6th interval between after 5th, 50MB from 450s to 500s
+    bytes = (long)50 * 1024 * 1024;
+    scheduler.copySucceeded(attemptID5, new MapHost(null, null), bytes, 450000, 500000, output);
+    Assert.assertEquals("copy task(attempt_test_0000_m_000005_5 succeeded at 1.00 MB/s)"
+        + " Aggregated copy rate(6 of 10 at 1.00 MB/s)", progress.toString());
+
+    TaskAttemptID attemptID6 = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 6), 6);
+    //adding the 7th interval between after 5th and 6th interval, 20MB from 320s to 340s
+    bytes = (long)20 * 1024 * 1024;
+    scheduler.copySucceeded(attemptID6, new MapHost(null, null), bytes, 320000, 340000, output);
+    Assert.assertEquals("copy task(attempt_test_0000_m_000006_6 succeeded at 1.00 MB/s)"
+        + " Aggregated copy rate(7 of 10 at 1.00 MB/s)", progress.toString());
+
+    TaskAttemptID attemptID7 = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 7), 7);
+    //adding the 8th interval overlapping with 4th, 5th, and 7th 30MB from 290s to 350s
+    bytes = (long)30 * 1024 * 1024;
+    scheduler.copySucceeded(attemptID7, new MapHost(null, null), bytes, 290000, 350000, output);
+    Assert.assertEquals("copy task(attempt_test_0000_m_000007_7 succeeded at 0.50 MB/s)"
+        + " Aggregated copy rate(8 of 10 at 1.00 MB/s)", progress.toString());
+
+    TaskAttemptID attemptID8 = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 8), 8);
+    //adding the 9th interval overlapping with 5th and 6th, 50MB from 400s to 450s
+    bytes = (long)50 * 1024 * 1024;
+    scheduler.copySucceeded(attemptID8, new MapHost(null, null), bytes, 400000, 450000, output);
+    Assert.assertEquals("copy task(attempt_test_0000_m_000008_8 succeeded at 1.00 MB/s)"
+        + " Aggregated copy rate(9 of 10 at 1.00 MB/s)", progress.toString());
+
+    TaskAttemptID attemptID9 = new TaskAttemptID(
+        new org.apache.hadoop.mapred.TaskID(
+        new JobID("test",0), TaskType.MAP, 9), 9);
+    //adding the 10th interval overlapping with all intervals, 500MB from 0s to 500s
+    bytes = (long)500 * 1024 * 1024;
+    scheduler.copySucceeded(attemptID9, new MapHost(null, null), bytes, 0, 500000, output);
+    Assert.assertEquals("copy task(attempt_test_0000_m_000009_9 succeeded at 1.00 MB/s)"
+        + " Aggregated copy rate(10 of 10 at 2.00 MB/s)", progress.toString());
+
+  }
 }


Mime
View raw message