tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [44/50] [abbrv] tez git commit: Merge remote-tracking branch 'origin/master' into TEZ-3334-MERGE1
Date Wed, 24 May 2017 21:08:13 GMT
Merge remote-tracking branch 'origin/master' into TEZ-3334-MERGE1


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/651257fc
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/651257fc
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/651257fc

Branch: refs/heads/master
Commit: 651257fc69f6af5a58a4589b002ce593b7fa1187
Parents: e1a9c28 68fe023
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Thu May 4 16:40:16 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Thu May 4 16:40:16 2017 -0500

----------------------------------------------------------------------
 BUILDING.txt                                    |   21 +-
 docs/src/site/markdown/by-laws.md               |    2 +-
 docs/src/site/markdown/install.md               |    8 +-
 .../hadoop-shim-2.4/findbugs-exclude.xml        |   16 -
 hadoop-shim-impls/hadoop-shim-2.4/pom.xml       |   56 -
 .../hadoop/shim/HadoopShim23_24Provider.java    |   33 -
 .../apache/tez/hadoop/shim/HadoopShim24.java    |   45 -
 ...rg.apache.tez.hadoop.shim.HadoopShimProvider |   14 -
 .../shim/TestHadoop23_24ShimProvider.java       |   82 -
 .../hadoop-shim-2.6/findbugs-exclude.xml        |   16 -
 hadoop-shim-impls/hadoop-shim-2.6/pom.xml       |   56 -
 .../hadoop/shim/HadoopShim25_26_27Provider.java |   33 -
 .../apache/tez/hadoop/shim/HadoopShim26.java    |   52 -
 ...rg.apache.tez.hadoop.shim.HadoopShimProvider |   14 -
 .../shim/TestHadoop25_26_27ShimProvider.java    |   81 -
 .../hadoop-shim-2.7/findbugs-exclude.xml        |   16 +
 hadoop-shim-impls/hadoop-shim-2.7/pom.xml       |   56 +
 .../hadoop/shim/HadoopShim25_26_27Provider.java |   33 +
 .../apache/tez/hadoop/shim/HadoopShim27.java    |   52 +
 ...rg.apache.tez.hadoop.shim.HadoopShimProvider |   14 +
 .../shim/TestHadoop25_26_27ShimProvider.java    |   81 +
 hadoop-shim-impls/pom.xml                       |   13 +-
 pom.xml                                         |    4 +-
 tez-api/pom.xml                                 |   30 +-
 .../org/apache/tez/client/TezClientUtils.java   |   26 +
 .../org/apache/tez/common/ATSConstants.java     |    1 +
 .../main/java/org/apache/tez/dag/api/DAG.java   |   70 +-
 .../tez/dag/api/EdgeManagerPluginContext.java   |    4 +
 .../apache/tez/dag/api/TezConfiguration.java    |   18 +
 .../tez/dag/api/VertexManagerPluginContext.java |    9 +
 .../apache/tez/dag/api/client/DAGStatus.java    |    4 +-
 .../apache/tez/dag/api/client/VertexStatus.java |    4 +-
 .../org/apache/tez/runtime/api/TaskContext.java |   17 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   |    3 +-
 .../api/client/TestTimelineReaderFactory.java   |   29 -
 .../event/TestCompositeDataMovementEvent.java   |    5 +-
 .../org/apache/tez/common/AsyncDispatcher.java  |   17 +-
 .../tez/common/AsyncDispatcherConcurrent.java   |   12 +-
 .../org/apache/tez/common/TezExecutors.java     |   52 +
 .../apache/tez/common/TezSharedExecutor.java    |  348 ++
 .../tez/dag/history/logging/EntityTypes.java    |    1 +
 .../tez/dag/utils/RelocalizationUtils.java      |   26 -
 .../tez/util/TezMxBeanResourceCalculator.java   |    4 +-
 .../tez/common/TestTezSharedExecutor.java       |  240 +
 .../util/TestTezMxBeanResourceCalculator.java   |    4 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   13 +-
 .../apache/tez/dag/app/dag/DAGScheduler.java    |   27 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |    2 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |   12 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |    4 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   13 +-
 .../tez/dag/app/dag/impl/VertexManager.java     |   14 +
 .../serviceplugins/api/TaskCommunicator.java    |   14 -
 .../api/TaskCommunicatorContext.java            |   14 -
 .../api/TaskHeartbeatRequest.java               |   14 -
 .../api/TaskHeartbeatResponse.java              |   14 -
 .../tez/dag/app/dag/impl/TestDAGScheduler.java  |   60 +-
 .../apache/tez/dag/app/dag/impl/TestEdge.java   |   32 +
 .../tez/dag/app/dag/impl/TestTaskImpl.java      |   42 +
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   21 +-
 .../tez/dag/app/dag/impl/TestVertexManager.java |   26 +
 tez-dist/pom.xml                                |   22 +-
 .../tez/service/impl/ContainerRunnerImpl.java   |   15 +-
 .../apache/tez/service/impl/TezTestService.java |    8 +-
 .../tez/mapreduce/output/TestMROutput.java      |   15 +-
 .../tez/mapreduce/processor/MapUtils.java       |    5 +-
 .../processor/map/TestMapProcessor.java         |   29 +-
 .../processor/reduce/TestReduceProcessor.java   |    7 +-
 tez-plugins/pom.xml                             |   12 +-
 .../org/apache/tez/history/ATSImportTool.java   |   19 +-
 .../logging/ats/TimelineCachePluginImpl.java    |    4 +-
 .../ats/TestTimelineCachePluginImpl.java        |    2 +
 .../ats/ATSV15HistoryLoggingService.java        |   11 +-
 .../ats/TestATSV15HistoryLoggingService.java    |   10 +-
 tez-plugins/tez-yarn-timeline-history/pom.xml   |    6 +-
 .../logging/ats/ATSHistoryLoggingService.java   |    8 +-
 .../ats/HistoryEventTimelineConversion.java     |  150 +-
 .../ats/TestATSHistoryLoggingService.java       |    8 +-
 .../ats/TestHistoryEventTimelineConversion.java |  168 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |   14 +-
 .../runtime/api/impl/TezInputContextImpl.java   |    7 +-
 .../runtime/api/impl/TezOutputContextImpl.java  |    7 +-
 .../api/impl/TezProcessorContextImpl.java       |    7 +-
 .../runtime/api/impl/TezTaskContextImpl.java    |   13 +-
 .../tez/runtime/metrics/TaskCounterUpdater.java |    4 +-
 .../org/apache/tez/runtime/task/TezChild.java   |    6 +-
 .../apache/tez/runtime/task/TezTaskRunner2.java |   31 +-
 .../TestLogicalIOProcessorRuntimeTask.java      |   12 +-
 .../runtime/api/impl/TestProcessorContext.java  |   14 +-
 .../tez/runtime/task/TestTaskExecution2.java    |   14 +-
 .../tez/runtime/task/TestTezTaskRunner2.java    |    8 +-
 .../CartesianProductCombination.java            |   78 +-
 .../CartesianProductConfig.java                 |   60 +-
 .../CartesianProductEdgeManagerConfig.java      |   39 +-
 .../CartesianProductEdgeManagerPartitioned.java |    8 +-
 .../CartesianProductEdgeManagerReal.java        |    1 -
 ...artesianProductEdgeManagerUnpartitioned.java |   57 +-
 .../CartesianProductVertexManager.java          |   51 +-
 .../CartesianProductVertexManagerConfig.java    |   40 +-
 ...artesianProductVertexManagerPartitioned.java |   10 +-
 ...tesianProductVertexManagerUnpartitioned.java |  316 +-
 .../library/common/shuffle/FetchResult.java     |   17 -
 .../orderedgrouped/FetcherOrderedGrouped.java   |   13 +-
 .../orderedgrouped/ShuffleClientMetrics.java    |   92 -
 .../orderedgrouped/ShuffleScheduler.java        |   89 +-
 .../writers/UnorderedPartitionedKVWriter.java   |  111 +-
 .../runtime/library/input/UnorderedKVInput.java |    2 +-
 .../main/proto/CartesianProductPayload.proto    |    9 +-
 .../TestShuffleVertexManagerUtils.java          |    5 +
 .../TestCartesianProductCombination.java        |   30 +-
 .../TestCartesianProductConfig.java             |    6 +-
 .../TestCartesianProductEdgeManager.java        |    6 +-
 .../TestCartesianProductEdgeManagerConfig.java  |   15 +-
 ...tCartesianProductEdgeManagerPartitioned.java |   13 +-
 ...artesianProductEdgeManagerUnpartitioned.java |  408 +-
 .../TestCartesianProductVertexManager.java      |   19 +
 ...TestCartesianProductVertexManagerConfig.java |    8 +-
 ...tesianProductVertexManagerUnpartitioned.java |  565 +-
 .../shuffle/orderedgrouped/TestFetcher.java     |   33 +-
 .../orderedgrouped/TestMergeManager.java        |   26 +-
 ...tShuffleInputEventHandlerOrderedGrouped.java |   48 +-
 .../output/TestOnFileUnorderedKVOutput.java     |   23 +-
 tez-tests/pom.xml                               |    4 +
 tez-tools/analyzers/pom.xml                     |   11 +-
 tez-ui/README.md                                |   26 +-
 tez-ui/pom.xml                                  |   67 +-
 tez-ui/src/main/webapp/.bowerrc                 |    5 +-
 tez-ui/src/main/webapp/README.md                |   26 +-
 tez-ui/src/main/webapp/app/adapters/dag-info.js |   22 +
 .../app/components/home-table-controls.js       |   40 +
 .../main/webapp/app/controllers/dag/vertices.js |    2 +-
 .../main/webapp/app/controllers/home/index.js   |    6 +-
 tez-ui/src/main/webapp/app/controllers/table.js |    2 +-
 tez-ui/src/main/webapp/app/entities/entity.js   |   10 +-
 tez-ui/src/main/webapp/app/models/dag-info.js   |   28 +
 tez-ui/src/main/webapp/app/models/dag.js        |   30 +-
 .../src/main/webapp/app/routes/dag/counters.js  |    3 +
 .../src/main/webapp/app/routes/dag/graphical.js |    3 +
 tez-ui/src/main/webapp/app/routes/dag/index.js  |    3 +
 .../src/main/webapp/app/routes/dag/swimlane.js  |    3 +
 tez-ui/src/main/webapp/app/routes/home/index.js |   18 +
 .../src/main/webapp/app/serializers/dag-info.js |   60 +
 tez-ui/src/main/webapp/app/serializers/dag.js   |   47 +-
 tez-ui/src/main/webapp/app/styles/app.less      |    1 +
 .../webapp/app/styles/home-table-controls.less  |   22 +
 .../webapp/app/styles/queries-page-search.less  |    1 +
 tez-ui/src/main/webapp/app/styles/shared.less   |    6 +
 .../components/home-table-controls.hbs          |   24 +
 .../main/webapp/app/templates/home/index.hbs    |    1 +
 .../main/webapp/app/utils/download-dag-zip.js   |    9 +
 tez-ui/src/main/webapp/bower-shrinkwrap.json    |   71 +
 .../src/main/webapp/config/default-app-conf.js  |    3 +-
 tez-ui/src/main/webapp/package.json             |    6 +-
 .../components/home-table-controls-test.js      |   80 +
 .../webapp/tests/unit/adapters/dag-info-test.js |   30 +
 .../tests/unit/controllers/home/index-test.js   |    2 +-
 .../webapp/tests/unit/models/dag-info-test.js   |   35 +
 .../main/webapp/tests/unit/models/dag-test.js   |   44 +
 .../webapp/tests/unit/routes/home/index-test.js |   24 +
 .../tests/unit/serializers/dag-info-test.js     |  114 +
 .../webapp/tests/unit/serializers/dag-test.js   |   26 +-
 .../tests/unit/serializers/timeline-test.js     |    2 +
 tez-ui/src/main/webapp/yarn.lock                | 4945 ++++++++++++++++++
 163 files changed, 8752 insertions(+), 1988 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 76b83f5,0307a04..7f3ad2e
--- a/pom.xml
+++ b/pom.xml
@@@ -37,9 -37,8 +37,9 @@@
    <properties>
      <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
      <clover.license>${user.home}/clover.license</clover.license>
-     <hadoop.version>2.6.0</hadoop.version>
+     <hadoop.version>2.7.0</hadoop.version>
      <jetty.version>6.1.26</jetty.version>
 +    <netty.version>3.6.2.Final</netty.version>
      <pig.version>0.13.0</pig.version>
      <javac.version>1.7</javac.version>
      <slf4j.version>1.7.10</slf4j.version>
@@@ -56,10 -55,9 +56,10 @@@
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <scm.url>scm:git:https://git-wip-us.apache.org/repos/asf/tez.git</scm.url>
      <build.time>${maven.build.timestamp}</build.time>
-     <frontend-maven-plugin.version>1.1</frontend-maven-plugin.version>
+     <frontend-maven-plugin.version>1.2</frontend-maven-plugin.version>
      <findbugs-maven-plugin.version>3.0.1</findbugs-maven-plugin.version>
      <javadoc-maven-plugin.version>2.9.1</javadoc-maven-plugin.version>
 +    <shade-maven-plugin.version>2.4.3</shade-maven-plugin.version>
    </properties>
    <scm>
      <connection>${scm.url}</connection>

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
----------------------------------------------------------------------
diff --cc tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
index 8d9436e,17eb88c..85e9227
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/impl/TezTestService.java
@@@ -23,7 -23,8 +23,9 @@@ import com.google.common.base.Precondit
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.service.AbstractService;
  import org.apache.hadoop.util.StringUtils;
 +import org.apache.tez.dag.api.TezConfiguration;
+ import org.apache.tez.common.TezExecutors;
+ import org.apache.tez.common.TezSharedExecutor;
  import org.apache.tez.dag.api.TezException;
  import org.apache.tez.service.ContainerRunner;
  import org.apache.tez.shufflehandler.ShuffleHandler;

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --cc tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 9c115b9,eb30841..bd83dfa
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@@ -171,11 -159,10 +159,12 @@@ public class TestMapProcessor 
      task.initialize();
      task.run();
      task.close();
-     
+     sharedExecutor.shutdownNow();
+ 
      OutputContext outputContext = task.getOutputContexts().iterator().next();
 -    TezTaskOutput mapOutputs = new TezTaskOutputFiles(jobConf, outputContext.getUniqueIdentifier());
 +    TezTaskOutput mapOutputs = new TezTaskOutputFiles(
 +        jobConf, outputContext.getUniqueIdentifier(),
 +        outputContext.getDagIdentifier());
      
      
      // TODO NEWTEZ FIXME OutputCommitter verification

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-plugins/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/FetchResult.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 4b7c7fd,5cad6fc..b762c75
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@@ -131,15 -125,12 +129,14 @@@ class FetcherOrderedGrouped extends Cal
                                 int dagId,
                                 boolean asyncHttp,
                                 boolean sslShuffle,
 -                               boolean verifyDiskChecksum) {
 +                               boolean verifyDiskChecksum,
 +                               boolean compositeFetch) {
      this.scheduler = scheduler;
      this.allocator = allocator;
-     this.metrics = metrics;
      this.exceptionReporter = exceptionReporter;
      this.mapHost = mapHost;
 -    this.currentPartition = this.mapHost.getPartitionId();
 +    this.minPartition = this.mapHost.getPartitionId();
 +    this.maxPartition = this.minPartition + this.mapHost.getPartitionCount() - 1;
      this.id = nextId.incrementAndGet();
      this.jobTokenSecretManager = jobTokenSecretMgr;
  
@@@ -533,47 -458,68 +527,45 @@@
            }
            return EMPTY_ATTEMPT_ID_ARRAY;
          }
 -      }
 -      
 -      if(LOG.isDebugEnabled()) {
 -        LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + 
 -            ", decomp len: " + decompressedLength);
 -      }
  
 -      // Get the location for the map output - either in-memory or on-disk
 -      try {
 -        mapOutput = allocator.reserve(srcAttemptId, decompressedLength, compressedLength,
id);
 -      } catch (IOException e) {
 -        if (!stopped) {
 -          // Kill the reduce attempt
 -          ioErrs.increment(1);
 -          scheduler.reportLocalError(e);
 -        } else {
 -          if (LOG.isDebugEnabled()) {
 -            LOG.debug("Already stopped. Ignoring error from merger.reserve");
 -          }
 +        // Check if we can shuffle *now* ...
 +        if (mapOutput.getType() == Type.WAIT) {
 +          LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
 +          //Not an error but wait to process data.
 +          return EMPTY_ATTEMPT_ID_ARRAY;
          }
 -        return EMPTY_ATTEMPT_ID_ARRAY;
 -      }
 -      
 -      // Check if we can shuffle *now* ...
 -      if (mapOutput.getType() == Type.WAIT) {
 -        LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
 -        //Not an error but wait to process data.
 -        return EMPTY_ATTEMPT_ID_ARRAY;
 -      } 
 -      
 -      // Go!
 -      if (LOG.isDebugEnabled()) {
 -        LOG.debug("fetcher#" + id + " about to shuffle output of map " +
 -            mapOutput.getAttemptIdentifier() + " decomp: " +
 -            decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
 -      }
  
 -      if (mapOutput.getType() == Type.MEMORY) {
 -        ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
 -          (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
 -          ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier());
 -      } else if (mapOutput.getType() == Type.DISK) {
 -        ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
 -          input, compressedLength, decompressedLength, LOG,
 -          mapOutput.getAttemptIdentifier(),
 -          ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
 -      } else {
 -        throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
 -            mapOutput.getType());
 -      }
 +        // Go!
 +        if (LOG.isDebugEnabled()) {
 +          LOG.debug("fetcher#" + id + " about to shuffle output of map " +
 +              mapOutput.getAttemptIdentifier() + " decomp: " +
 +              decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
 +        }
  
 -      // Inform the shuffle scheduler
 -      long endTime = System.currentTimeMillis();
 -      // Reset retryStartTime as map task make progress if retried before.
 -      retryStartTime = 0;
 +        if (mapOutput.getType() == Type.MEMORY) {
 +          ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
 +              (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
 +              ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier());
 +        } else if (mapOutput.getType() == Type.DISK) {
 +          ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
 +              input, compressedLength, decompressedLength, LOG,
 +              mapOutput.getAttemptIdentifier(),
 +              ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
 +        } else {
 +          throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
 +              mapOutput.getType());
 +        }
  
 -      scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
 -                              endTime - startTime, mapOutput, false);
 -      // Note successful shuffle
 -      remaining.remove(srcAttemptId.toString());
 -      return null;
 -    } catch (IOException ioe) {
 +        // Inform the shuffle scheduler
 +        long endTime = System.currentTimeMillis();
 +        // Reset retryStartTime as map task make progress if retried before.
 +        retryStartTime = 0;
 +
 +        scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
 +            endTime - startTime, mapOutput, false);
-         // Note successful shuffle
-         metrics.successFetch();
 +      }
 +      remaining.remove(inputAttemptIdentifier.toString());
 +    } catch(IOException ioe) {
        if (stopped) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Not reporting fetch failure for exception during data copy: ["
@@@ -609,10 -555,8 +601,9 @@@
  
        // Inform the shuffle-scheduler
        mapOutput.abort();
-       metrics.failedFetch();
-       return new InputAttemptIdentifier[]{srcAttemptId};
+       return new InputAttemptIdentifier[] {srcAttemptId};
      }
 +    return null;
    }
  
    /**
@@@ -713,47 -665,35 +704,45 @@@
          }
          InputAttemptIdentifier srcAttemptId = iter.next();
          MapOutput mapOutput = null;
 -        try {
 -          long startTime = System.currentTimeMillis();
 -          Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
 -
 -          TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(),
 -              currentPartition);
 -
 -          mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
 -          long endTime = System.currentTimeMillis();
 -          scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
 -              indexRecord.getRawLength(), (endTime - startTime), mapOutput, true);
 -          iter.remove();
 -        } catch (IOException e) {
 -          if (mapOutput != null) {
 -            mapOutput.abort();
 -          }
 -          if (!stopped) {
 -            ioErrs.increment(1);
 -            scheduler.copyFailed(srcAttemptId, host, true, false, true);
 -            LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " +
 -                host.getHostIdentifier(), e);
 -          } else {
 -            if (LOG.isDebugEnabled()) {
 -              LOG.debug(
 -                  "Ignoring fetch error during local disk copy since fetcher has already
been stopped");
 +        boolean hasFailures = false;
 +        // Fetch partition count number of map outputs (handles auto-reduce case)
 +        for (int curPartition = minPartition; curPartition <= maxPartition; curPartition++)
{
 +          try {
 +            long startTime = System.currentTimeMillis();
 +
 +            // Partition id is the base partition id plus the relative offset
 +            int reduceId = host.getPartitionId() + curPartition - minPartition;
 +            srcAttemptId = scheduler.getIdentifierForFetchedOutput(srcAttemptId.getPathComponent(),
reduceId);
 +            Path filename = getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
 +            TezIndexRecord indexRecord = getIndexRecord(srcAttemptId.getPathComponent(),
reduceId);
 +
 +            mapOutput = getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
 +            long endTime = System.currentTimeMillis();
 +            scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(),
 +                indexRecord.getRawLength(), (endTime - startTime), mapOutput, true);
-             metrics.successFetch();
 +          } catch (IOException e) {
 +            if (mapOutput != null) {
 +              mapOutput.abort();
              }
 -            return;
 +            if (!stopped) {
 +              hasFailures = true;
-               metrics.failedFetch();
 +              ioErrs.increment(1);
 +              scheduler.copyFailed(srcAttemptId, host, true, false, true);
 +              LOG.warn("Failed to read local disk output of " + srcAttemptId + " from "
+
 +                  host.getHostIdentifier(), e);
 +            } else {
 +              if (LOG.isDebugEnabled()) {
 +                LOG.debug(
 +                    "Ignoring fetch error during local disk copy since fetcher has already
been stopped");
 +              }
 +              return;
 +            }
 +
            }
          }
 +        if (!hasFailures) {
 +          iter.remove();
 +        }
        }
      } finally {
        putBackRemainingMapOutputs(host);

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 02bda68,73a6214..33fb0f4
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@@ -54,8 -55,6 +55,7 @@@ import com.google.common.util.concurren
  import com.google.common.util.concurrent.MoreExecutors;
  import com.google.common.util.concurrent.ThreadFactoryBuilder;
  import org.apache.hadoop.io.compress.CompressionCodec;
- import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.tez.dag.api.TezConfiguration;
  import org.apache.tez.http.HttpConnectionParams;
  import org.apache.tez.common.CallableWithNdc;
  import org.apache.tez.common.security.JobTokenSecretManager;
@@@ -375,12 -369,9 +374,9 @@@ class ShuffleScheduler 
          TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
      this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP,
false);
      this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
-     this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(),
-         inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
-         this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
      SecretKey jobTokenSecret = ShuffleUtils
          .getJobTokenSecretFromTokenBytes(inputContext
 -            .getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
 +            .getServiceConsumerMetaData(auxiliaryService));
      this.jobTokenSecretManager = new JobTokenSecretManager(jobTokenSecret);
  
      ExecutorService fetcherRawExecutor = Executors.newFixedThreadPool(numFetchers,
@@@ -413,9 -404,8 +409,9 @@@
      this.skippedInputCounter = inputContext.getCounters().findCounter(TaskCounter.NUM_SKIPPED_INPUTS);
      this.firstEventReceived = inputContext.getCounters().findCounter(TaskCounter.FIRST_EVENT_RECEIVED);
      this.lastEventReceived = inputContext.getCounters().findCounter(TaskCounter.LAST_EVENT_RECEIVED);
 +    this.compositeFetch = ShuffleUtils.isTezShuffleHandler(conf);
  
-     pipelinedShuffleInfoEventsMap = new HashMap<Integer, ShuffleEventInfo>();
+     pipelinedShuffleInfoEventsMap = Maps.newConcurrentMap();
      LOG.info("ShuffleScheduler running for sourceVertex: "
          + inputContext.getSourceVertexName() + " with configuration: "
          + "maxFetchFailuresBeforeReporting=" + maxFetchFailuresBeforeReporting
@@@ -1110,18 -1137,12 +1144,18 @@@
  
    public InputAttemptIdentifier getIdentifierForFetchedOutput(
        String path, int reduceId) {
 -    return pathToIdentifierMap.get(getIdentifierFromPathAndReduceId(path, reduceId));
 +    return pathToIdentifierMap.get(new PathPartition(path, reduceId));
    }
    
-   private boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
+   private synchronized boolean inputShouldBeConsumed(InputAttemptIdentifier id) {
 -    return (!obsoleteInputs.contains(id) && 
 -             !isInputFinished(id.getInputIdentifier()));
 +    boolean isInputFinished = false;
 +    if (id instanceof CompositeInputAttemptIdentifier) {
 +      CompositeInputAttemptIdentifier cid = (CompositeInputAttemptIdentifier)id;
 +      isInputFinished = isInputFinished(cid.getInputIdentifier(), cid.getInputIdentifier()
+ cid.getInputIdentifierCount());
 +    } else {
 +      isInputFinished = isInputFinished(id.getInputIdentifier());
 +    }
 +    return !obsoleteInputs.contains(id) && !isInputFinished;
    }
  
    public synchronized List<InputAttemptIdentifier> getMapsForHost(MapHost host) {

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index dfa473b,a9b57a9..ef371c2
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@@ -154,9 -149,9 +152,9 @@@ public class TestFetcher 
  
      final boolean ENABLE_LOCAL_FETCH = true;
      final boolean DISABLE_LOCAL_FETCH = false;
 -    MapHost mapHost = new MapHost(HOST, PORT, 0);
 +    MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
      FetcherOrderedGrouped fetcher =
-         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false,
0,
+         new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
              null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
              wrongLengthErrsCounter, badIdErrsCounter,
              wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID,
DAG_ID,
@@@ -172,9 -167,9 +170,9 @@@
      verify(spyFetcher, never()).copyFromHost(any(MapHost.class));
  
      // if hostname does not match use http
 -    mapHost = new MapHost(HOST + "_OTHER", PORT, 0);
 +    mapHost = new MapHost(HOST + "_OTHER", PORT, 0, 1);
      fetcher =
-         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false,
0,
+         new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
              null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
              wrongLengthErrsCounter, badIdErrsCounter,
              wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID,
DAG_ID,
@@@ -188,9 -183,9 +186,9 @@@
      verify(spyFetcher, times(1)).copyFromHost(mapHost);
  
      // if port does not match use http
 -    mapHost = new MapHost(HOST, PORT + 1, 0);
 +    mapHost = new MapHost(HOST, PORT + 1, 0, 1);
      fetcher =
-         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false,
0,
+         new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
              null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
              wrongLengthErrsCounter, badIdErrsCounter,
              wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID,
DAG_ID,
@@@ -204,8 -199,8 +202,8 @@@
      verify(spyFetcher, times(1)).copyFromHost(mapHost);
  
      //if local fetch is not enabled
 -    mapHost = new MapHost(HOST, PORT, 0);
 +    mapHost = new MapHost(HOST, PORT, 0, 1);
-     fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
false, 0,
+     fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
          null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
          wrongLengthErrsCounter, badIdErrsCounter,
          wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
@@@ -230,11 -224,11 +227,11 @@@
      when(inputContext.getCounters()).thenReturn(new TezCounters());
      when(inputContext.getSourceVertexName()).thenReturn("");
  
 -    MapHost host = new MapHost(HOST, PORT, 1);
 +    MapHost host = new MapHost(HOST, PORT, 1, 1);
-     FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics,
shuffle, null, false, 0,
+     FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle,
null, false, 0,
          null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter,
badIdErrsCounter,
          wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
 -        false, false, true);
 +        false, false, true, false);
      FetcherOrderedGrouped spyFetcher = spy(fetcher);
  
  
@@@ -318,151 -292,21 +315,144 @@@
  
      // should have exactly 3 success and 1 failure.
      for (int i : sucessfulAttemptsIndexes) {
 -      verifyCopySucceeded(scheduler, host, srcAttempts, i);
 +      for (int j = 0; j < host.getPartitionCount(); j++) {
 +        verifyCopySucceeded(scheduler, host, srcAttempts, i, j);
 +      }
      }
 -    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX), host, true,
false, true);
 -    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX), host, true,
false, true);
 +    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host,
true, false, true);
 +    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host,
true, false, true);
  
-     verify(metrics, times(3)).successFetch();
-     verify(metrics, times(2)).failedFetch();
- 
      verify(spyFetcher).putBackRemainingMapOutputs(host);
      verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
      verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
    }
  
 +  @Test(timeout = 5000)
 +  public void testSetupLocalDiskFetchAutoReduce() throws Exception {
 +    Configuration conf = new TezConfiguration();
 +    ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
 +    MergeManager merger = mock(MergeManager.class);
-     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
 +    Shuffle shuffle = mock(Shuffle.class);
 +    InputContext inputContext = mock(InputContext.class);
 +    when(inputContext.getCounters()).thenReturn(new TezCounters());
 +    when(inputContext.getSourceVertexName()).thenReturn("");
 +
 +    MapHost host = new MapHost(HOST, PORT, 1, 2);
-     FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics,
shuffle, null, false, 0,
++    FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, shuffle,
null, false, 0,
 +        null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter,
badIdErrsCounter,
 +        wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
 +        false, false, true, false);
 +    FetcherOrderedGrouped spyFetcher = spy(fetcher);
 +
 +
 +    final List<CompositeInputAttemptIdentifier> srcAttempts = Arrays.asList(
 +        new CompositeInputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0",
host.getPartitionCount()),
 +        new CompositeInputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1",
host.getPartitionCount()),
 +        new CompositeInputAttemptIdentifier(2, 3, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_2",
host.getPartitionCount()),
 +        new CompositeInputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3",
host.getPartitionCount()),
 +        new CompositeInputAttemptIdentifier(4, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_4",
host.getPartitionCount())
 +    );
 +    final int FIRST_FAILED_ATTEMPT_IDX = 2;
 +    final int SECOND_FAILED_ATTEMPT_IDX = 4;
 +    final int[] sucessfulAttemptsIndexes = { 0, 1, 3 };
 +
 +    doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
 +    final ConcurrentMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier> pathToIdentifierMap
 +        = new ConcurrentHashMap<ShuffleScheduler.PathPartition, InputAttemptIdentifier>();
 +    for (CompositeInputAttemptIdentifier srcAttempt : srcAttempts) {
 +      for (int i = 0; i < srcAttempt.getInputIdentifierCount(); i++) {
 +        ShuffleScheduler.PathPartition pathPartition = new ShuffleScheduler.PathPartition(srcAttempt.getPathComponent(),
host.getPartitionId() + i);
 +        pathToIdentifierMap.put(pathPartition, srcAttempt.expand(i));
 +      }
 +    }
 +
 +    doAnswer(new Answer<InputAttemptIdentifier>() {
 +        @Override
 +        public InputAttemptIdentifier answer(InvocationOnMock invocation) throws Throwable
{
 +          Object[] args = invocation.getArguments();
 +          String path = (String) args[0];
 +          int reduceId = (int) args[1];
 +          return pathToIdentifierMap.get(new ShuffleScheduler.PathPartition(path, reduceId));
 +        }
 +      }).when(scheduler)
 +          .getIdentifierForFetchedOutput(any(String.class), any(int.class));
 +
 +    doAnswer(new Answer<MapOutput>() {
 +      @Override
 +      public MapOutput answer(InvocationOnMock invocation) throws Throwable {
 +        Object[] args = invocation.getArguments();
 +        MapOutput mapOutput = mock(MapOutput.class);
 +        doReturn(MapOutput.Type.DISK_DIRECT).when(mapOutput).getType();
 +        doReturn(args[0]).when(mapOutput).getAttemptIdentifier();
 +        return mapOutput;
 +      }
 +    }).when(spyFetcher)
 +        .getMapOutputForDirectDiskFetch(any(InputAttemptIdentifier.class), any(Path.class),
 +            any(TezIndexRecord.class));
 +
 +    doAnswer(new Answer<Path>() {
 +      @Override
 +      public Path answer(InvocationOnMock invocation) throws Throwable {
 +        Object[] args = invocation.getArguments();
 +        return new Path(SHUFFLE_INPUT_FILE_PREFIX + args[0]);
 +      }
 +    }).when(spyFetcher).getShuffleInputFileName(anyString(), anyString());
 +
 +    for (int i = 0; i < host.getPartitionCount(); i++) {
 +      doAnswer(new Answer<TezIndexRecord>() {
 +        @Override
 +        public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
 +          Object[] args = invocation.getArguments();
 +          String pathComponent = (String) args[0];
 +          int len = pathComponent.length();
 +          long p = Long.valueOf(pathComponent.substring(len - 1, len));
 +
 +          if (pathComponent.equals(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).getPathComponent())
||
 +              pathComponent.equals(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).getPathComponent()))
{
 +            throw new IOException("Thowing exception to simulate failure case");
 +          }
 +          // match with params for copySucceeded below.
 +          return new TezIndexRecord(p * 10, p * 1000, p * 100);
 +        }
 +      }).when(spyFetcher).getIndexRecord(anyString(), eq(host.getPartitionId() + i));
 +    }
 +
 +    doNothing().when(scheduler).copySucceeded(any(InputAttemptIdentifier.class), any(MapHost.class),
 +        anyLong(), anyLong(), anyLong(), any(MapOutput.class), anyBoolean());
 +    doNothing().when(scheduler).putBackKnownMapOutput(host,
 +        srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0));
 +    doNothing().when(scheduler).putBackKnownMapOutput(host,
 +        srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1));
 +    doNothing().when(scheduler).putBackKnownMapOutput(host,
 +        srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0));
 +    doNothing().when(scheduler).putBackKnownMapOutput(host,
 +        srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1));
 +
 +    spyFetcher.setupLocalDiskFetch(host);
 +
 +    // should have exactly 3 success and 1 failure.
 +    for (int i : sucessfulAttemptsIndexes) {
 +      for (int j = 0; j < host.getPartitionCount(); j++) {
 +        verifyCopySucceeded(scheduler, host, srcAttempts, i, j);
 +      }
 +    }
 +    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(0), host,
true, false, true);
 +    verify(scheduler).copyFailed(srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX).expand(1), host,
true, false, true);
 +    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(0), host,
true, false, true);
 +    verify(scheduler).copyFailed(srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX).expand(1), host,
true, false, true);
 +
-     verify(metrics, times(6)).successFetch();
-     verify(metrics, times(4)).failedFetch();
- 
 +    verify(spyFetcher).putBackRemainingMapOutputs(host);
 +    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
 +    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
 +    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(FIRST_FAILED_ATTEMPT_IDX));
 +    verify(scheduler).putBackKnownMapOutput(host, srcAttempts.get(SECOND_FAILED_ATTEMPT_IDX));
 +  }
 +
    private void verifyCopySucceeded(ShuffleScheduler scheduler, MapHost host,
 -      List<InputAttemptIdentifier> srcAttempts, long p) throws
 +      List<CompositeInputAttemptIdentifier> srcAttempts, long p, int j) throws
        IOException {
      // need to verify filename, offsets, sizes wherever they are used.
 -    InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p);
 +    InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int) p).expand(j);
      String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent();
      ArgumentCaptor<MapOutput> captureMapOutput = ArgumentCaptor.forClass(MapOutput.class);
      verify(scheduler).copySucceeded(eq(srcAttemptToMatch), eq(host), eq(p * 100),
@@@ -522,11 -365,11 +511,11 @@@
      when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1));
  
      HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
 -    final MapHost host = new MapHost(HOST, PORT, 1);
 +    final MapHost host = new MapHost(HOST, PORT, 1, 1);
-     FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger,
metrics, shuffle, null, false, 0,
+     FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger,
shuffle, null, false, 0,
          null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter,
badIdErrsCounter,
          wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID, DAG_ID,
 -        false, false, true);
 +        false, false, true, false);
      final FetcherOrderedGrouped fetcher = spy(mockFetcher);
  
  
@@@ -611,9 -453,9 +599,9 @@@
      doReturn(new byte[10]).when(jobMgr).computeHash(any(byte[].class));
  
      HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
 -    final MapHost host = new MapHost(HOST, PORT, 1);
 +    final MapHost host = new MapHost(HOST, PORT, 1, 1);
      FetcherOrderedGrouped mockFetcher =
-         new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle,
jobMgr,
+         new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, shuffle, jobMgr,
              false, 0,
              null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
              wrongLengthErrsCounter, badIdErrsCounter,
@@@ -678,11 -520,10 +666,10 @@@
      Configuration conf = new TezConfiguration();
      ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
      MergeManager merger = mock(MergeManager.class);
-     ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
      Shuffle shuffle = mock(Shuffle.class);
 -    MapHost mapHost = new MapHost(HOST, PORT, 0);
 +    MapHost mapHost = new MapHost(HOST, PORT, 0, 1);
      FetcherOrderedGrouped fetcher =
-         new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false,
0,
+         new FetcherOrderedGrouped(null, scheduler, merger, shuffle, null, false, 0,
              null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
              wrongLengthErrsCounter, badIdErrsCounter,
              wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, APP_ID,
DAG_ID,

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 72cba80,695a307..ff5ceab
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@@ -240,10 -245,43 +247,43 @@@ public class TestShuffleInputEventHandl
      Event dme2 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true,
0, attemptNum);
      handler.handleEvents(Collections.singletonList(dme2));
  
-     InputAttemptIdentifier id2 =
-         new InputAttemptIdentifier(inputIdx, attemptNum,
-             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
0);
-     verify(scheduler, times(1)).reportExceptionForInput(any(IOException.class));
+     // task should issue kill request
+     verify(scheduler, times(1)).killSelf(any(IOException.class), any(String.class));
+   }
+ 
+   @Test (timeout = 5000)
+   public void testPipelinedShuffle_WithObsoleteEvents() throws IOException, InterruptedException
{
+     //Process attempt #1 first
+     int attemptNum = 1;
+     int inputIdx = 1;
+ 
+     Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true,
0, attemptNum);
+     handler.handleEvents(Collections.singletonList(dme1));
+ 
 -    InputAttemptIdentifier id1 =
 -        new InputAttemptIdentifier(inputIdx, attemptNum,
 -            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
0);
++    CompositeInputAttemptIdentifier id1 =
++        new CompositeInputAttemptIdentifier(inputIdx, attemptNum,
++            PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
0, 1);
+ 
+     verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1));
+     assertTrue("Shuffle info events should not be empty for pipelined shuffle",
+         !scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
+ 
+     int valuesInMapLocations = scheduler.mapLocations.values().size();
+     assertTrue("Maplocations should have values. current size: " + valuesInMapLocations,
+         valuesInMapLocations > 0);
+ 
+     // start scheduling for download. Sets up scheduledForDownload in eventInfo.
+     scheduler.getMapsForHost(scheduler.mapLocations.values().iterator().next());
+ 
+     // send input failed event.
+     List<Event> events = new LinkedList<Event>();
+     int targetIdx = 1;
+     InputFailedEvent failedEvent = InputFailedEvent.create(targetIdx, 0);
+     events.add(failedEvent);
+     handler.handleEvents(events);
+ 
+     // task should issue kill request, as inputs are scheduled for download already.
+     verify(scheduler, times(1)).killSelf(any(IOException.class), any(String.class));
    }
  
    @Test(timeout = 5000)

http://git-wip-us.apache.org/repos/asf/tez/blob/651257fc/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------


Mime
View raw message