tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [35/50] [abbrv] tez git commit: Merge branch 'master' into TEZ-3334-MERGE1
Date Wed, 24 May 2017 21:08:04 GMT
Merge branch 'master' into TEZ-3334-MERGE1


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/be0d01f6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/be0d01f6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/be0d01f6

Branch: refs/heads/master
Commit: be0d01f6add2bbb20af73ae44909f2510e37eff9
Parents: c3a7c21 e375b9d
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Mon Mar 27 17:30:53 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Mon Mar 27 17:30:53 2017 -0500

----------------------------------------------------------------------
 BUILDING.txt                                    |    5 +
 CHANGES.txt                                     | 2199 ------------------
 Tez_DOAP.rdf                                    |    7 +
 docs/pom.xml                                    |   18 +
 docs/src/site/markdown/install.md               |    2 +-
 .../site/markdown/releases/apache-tez-0-8-5.md  |   30 +
 docs/src/site/markdown/releases/index.md        |    1 +
 docs/src/site/site.xml                          |    2 +-
 .../org/apache/tez/client/FrameworkClient.java  |    2 +
 .../java/org/apache/tez/client/TezClient.java   |   34 +-
 .../org/apache/tez/client/TezClientUtils.java   |   18 +-
 .../org/apache/tez/client/TezYarnClient.java    |    9 +
 .../org/apache/tez/common/ATSConstants.java     |    1 +
 .../org/apache/tez/common/TezYARNUtils.java     |   32 +-
 .../apache/tez/common/security/TokenCache.java  |   16 +-
 .../main/java/org/apache/tez/dag/api/DAG.java   |   15 +
 .../org/apache/tez/dag/api/HistoryLogLevel.java |    1 +
 .../apache/tez/dag/api/TezConfiguration.java    |   82 +-
 .../org/apache/tez/dag/api/TezConstants.java    |    6 +
 .../apache/tez/dag/api/client/DAGClient.java    |   13 +
 .../tez/dag/api/client/DAGClientImpl.java       |   33 +-
 .../tez/dag/api/client/DAGClientInternal.java   |   14 +-
 .../dag/api/client/DAGClientTimelineImpl.java   |   11 +-
 .../dag/api/client/rpc/DAGClientRPCImpl.java    |   23 +-
 .../api/TaskSchedulerContext.java               |    3 +-
 .../org/apache/tez/client/TestTezClient.java    |   53 +-
 .../apache/tez/client/TestTezClientUtils.java   |   41 +
 .../org/apache/tez/common/TestTezYARNUtils.java |   52 +
 .../tez/common/security/TestTokenCache.java     |   57 +-
 .../org/apache/tez/dag/api/TestDAGVerify.java   |   25 +
 .../tez/dag/api/client/rpc/TestDAGClient.java   |   25 +-
 .../org/apache/tez/common/TezUtilsInternal.java |   40 +
 .../org/apache/tez/dag/records/TezDAGID.java    |   64 +-
 .../java/org/apache/tez/dag/records/TezID.java  |   21 +
 .../tez/dag/records/TezTaskAttemptID.java       |   57 +-
 .../org/apache/tez/dag/records/TezTaskID.java   |   51 +-
 .../org/apache/tez/dag/records/TezVertexID.java |   48 +-
 .../org/apache/tez/util/FastNumberFormat.java   |   55 +
 .../org/apache/tez/util/TestNumberFormat.java   |   39 +
 .../java/org/apache/tez/client/LocalClient.java |    7 +-
 .../java/org/apache/tez/dag/app/AppContext.java |    5 +
 .../org/apache/tez/dag/app/DAGAppMaster.java    |   62 +-
 .../dag/app/TaskCommunicatorContextImpl.java    |   25 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |    2 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |    3 +-
 .../dag/app/rm/TaskSchedulerContextImpl.java    |    7 +-
 .../app/rm/TaskSchedulerContextImplWrapper.java |   11 +-
 .../tez/dag/app/rm/TaskSchedulerManager.java    |   18 +-
 .../dag/app/rm/YarnTaskSchedulerService.java    |    3 +-
 .../tez/dag/app/rm/container/AMContainer.java   |    1 +
 .../app/rm/container/AMContainerHelpers.java    |   18 +-
 .../dag/app/rm/container/AMContainerImpl.java   |   67 +-
 .../dag/app/rm/container/AMContainerMap.java    |   40 +-
 .../org/apache/tez/dag/app/rm/node/AMNode.java  |    3 +
 .../rm/node/AMNodeEventContainerCompleted.java  |   37 +
 .../tez/dag/app/rm/node/AMNodeEventType.java    |    5 +-
 .../apache/tez/dag/app/rm/node/AMNodeImpl.java  |   67 +-
 .../tez/dag/app/rm/node/AMNodeTracker.java      |    5 +-
 .../dag/app/rm/node/PerSourceNodeTracker.java   |   11 +-
 .../apache/tez/dag/app/web/AMWebController.java |   32 +-
 .../tez/dag/history/HistoryEventHandler.java    |  137 +-
 .../tez/dag/history/HistoryEventType.java       |    4 +-
 .../dag/history/events/DAGSubmittedEvent.java   |   17 +-
 .../impl/HistoryEventJsonConversion.java        |    6 +
 tez-dag/src/main/proto/HistoryEvents.proto      |    1 +
 .../resources/tez-container-log4j.properties    |    8 +
 .../apache/tez/dag/app/MockDAGAppMaster.java    |    2 +-
 .../apache/tez/dag/app/TestDAGAppMaster.java    |    8 +-
 .../apache/tez/dag/app/TestRecoveryParser.java  |   26 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |   76 +
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |   65 +
 .../tez/dag/app/rm/TestTaskScheduler.java       |    3 +-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |    4 +-
 .../dag/app/rm/TestTaskSchedulerManager.java    |   42 +-
 .../dag/app/rm/container/TestAMContainer.java   |  136 +-
 .../app/rm/container/TestAMContainerMap.java    |  126 +-
 .../tez/dag/app/rm/node/TestAMNodeTracker.java  |   73 +
 .../tez/dag/app/web/TestAMWebController.java    |   38 +
 .../dag/history/TestHistoryEventHandler.java    |   81 +-
 .../TestHistoryEventsProtoConversion.java       |    4 +-
 .../impl/TestHistoryEventJsonConversion.java    |   14 +-
 .../history/recovery/TestRecoveryService.java   |    2 +-
 .../org/apache/tez/examples/ExampleDriver.java  |    2 +
 tez-ext-service-tests/pom.xml                   |    5 +
 .../apache/tez/dag/api/client/MRDAGClient.java  |   10 +
 .../tez/mapreduce/hadoop/MRInputHelpers.java    |    2 +
 .../apache/tez/mapreduce/processor/MRTask.java  |   22 -
 .../common/TestMRInputAMSplitGenerator.java     |   58 +-
 .../logging/ats/TimelineCachePluginImpl.java    |    1 -
 .../ats/TestTimelineCachePluginImpl.java        |   41 +-
 .../ats/acls/TestATSHistoryWithACLs.java        |    4 +-
 .../ats/TestATSV15HistoryLoggingService.java    |  146 +-
 .../ats/HistoryEventTimelineConversion.java     |    6 +
 .../ats/TestATSHistoryLoggingService.java       |    2 +-
 .../ats/TestHistoryEventTimelineConversion.java |   15 +-
 .../common/resources/MemoryDistributor.java     |   12 +-
 .../apache/tez/runtime/task/TaskReporter.java   |   15 +-
 .../org/apache/tez/runtime/task/TezChild.java   |    2 +
 .../tez/runtime/task/TestTaskExecution2.java    |   42 +-
 .../org/apache/tez/http/HttpConnection.java     |   18 +-
 .../java/org/apache/tez/http/SSLFactory.java    |    2 +-
 .../http/async/netty/AsyncHttpConnection.java   |    1 +
 .../CartesianProductCombination.java            |    3 +
 .../CartesianProductConfig.java                 |   10 +
 .../CartesianProductEdgeManagerConfig.java      |   12 +-
 ...artesianProductEdgeManagerUnpartitioned.java |   60 +-
 .../CartesianProductVertexManager.java          |   28 +-
 .../CartesianProductVertexManagerConfig.java    |   20 +-
 ...artesianProductVertexManagerPartitioned.java |    4 +
 .../CartesianProductVertexManagerReal.java      |    3 +-
 ...tesianProductVertexManagerUnpartitioned.java |  117 +-
 .../runtime/library/common/shuffle/Fetcher.java |    4 +-
 .../library/common/shuffle/ShuffleUtils.java    |  149 +-
 .../common/shuffle/impl/ShuffleManager.java     |    7 +-
 .../orderedgrouped/FetcherOrderedGrouped.java   |    4 +-
 .../shuffle/orderedgrouped/MergeManager.java    |  213 +-
 .../orderedgrouped/ShuffleScheduler.java        |    8 +-
 .../runtime/library/common/sort/impl/IFile.java |    6 +-
 .../common/sort/impl/PipelinedSorter.java       |   29 +-
 .../library/common/sort/impl/TezMerger.java     |   56 +-
 .../WeightedScalingMemoryDistributor.java       |   62 +-
 .../tez/runtime/library/utils/Grouper.java      |   89 +
 .../main/proto/CartesianProductPayload.proto    |    5 +-
 .../TestWeightedScalingMemoryDistributor.java   |  165 ++
 .../TestCartesianProductCombination.java        |    9 +
 .../TestCartesianProductConfig.java             |   34 +-
 .../TestCartesianProductEdgeManager.java        |   13 +-
 .../TestCartesianProductEdgeManagerConfig.java  |   50 +
 ...tCartesianProductEdgeManagerPartitioned.java |    6 +-
 ...artesianProductEdgeManagerUnpartitioned.java |  125 +-
 ...TestCartesianProductVertexManagerConfig.java |   53 +
 ...artesianProductVertexManagerPartitioned.java |    9 +-
 ...tesianProductVertexManagerUnpartitioned.java |  141 +-
 .../library/cartesianproduct/TestGrouper.java   |   80 +
 .../common/shuffle/TestShuffleUtils.java        |   40 +-
 .../orderedgrouped/TestMergeManager.java        |   98 +-
 .../common/sort/impl/TestPipelinedSorter.java   |   18 +-
 .../org/apache/tez/mapreduce/TestMRRJobs.java   |    2 +
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |    1 +
 .../org/apache/tez/test/TestAMRecovery.java     |    1 +
 .../org/apache/tez/test/TestDAGRecovery.java    |    1 +
 .../org/apache/tez/test/TestDAGRecovery2.java   |    1 +
 .../tez/test/TestExceptionPropagation.java      |    1 +
 .../org/apache/tez/test/TestFaultTolerance.java |   50 +-
 .../apache/tez/test/TestPipelinedShuffle.java   |    2 +
 .../java/org/apache/tez/test/TestRecovery.java  |    1 +
 .../org/apache/tez/test/TestSecureShuffle.java  |    3 +
 .../tez/test/TestTaskErrorsUsingLocalMode.java  |    3 +-
 .../java/org/apache/tez/test/TestTezJobs.java   |    3 +-
 tez-ui/pom.xml                                  |   10 +
 tez-ui/src/main/webapp/app/adapters/timeline.js |   11 +-
 .../main/webapp/app/components/caller-info.js   |    2 +
 .../webapp/app/components/dags-page-search.js   |    2 +
 .../webapp/app/components/dags-pagination-ui.js |  106 -
 .../webapp/app/components/date-formatter.js     |    8 +
 .../components/em-table-tasks-log-link-cell.js  |   33 +
 .../main/webapp/app/components/em-tooltip.js    |    8 +-
 .../main/webapp/app/components/pagination-ui.js |  106 +
 .../app/components/queries-page-search.js       |   61 +
 .../webapp/app/components/query-timeline.js     |   79 +
 .../webapp/app/components/zip-download-modal.js |    2 +-
 .../main/webapp/app/controllers/application.js  |    4 +-
 .../main/webapp/app/controllers/dag/attempts.js |   20 +
 .../webapp/app/controllers/dag/graphical.js     |    4 +
 .../main/webapp/app/controllers/dag/index.js    |    5 +
 .../main/webapp/app/controllers/dag/swimlane.js |   16 +-
 .../main/webapp/app/controllers/dag/tasks.js    |   13 +
 tez-ui/src/main/webapp/app/controllers/dags.js  |  166 --
 tez-ui/src/main/webapp/app/controllers/home.js  |   31 +
 .../main/webapp/app/controllers/home/index.js   |  205 ++
 .../main/webapp/app/controllers/home/queries.js |  206 ++
 tez-ui/src/main/webapp/app/controllers/query.js |   44 +
 .../webapp/app/controllers/query/configs.js     |   60 +
 .../main/webapp/app/controllers/query/index.js  |   22 +
 .../webapp/app/controllers/query/timeline.js    |   57 +
 tez-ui/src/main/webapp/app/controllers/table.js |    4 +-
 .../webapp/app/controllers/task/attempts.js     |   20 +
 .../webapp/app/controllers/vertex/attempts.js   |   20 +
 .../main/webapp/app/controllers/vertex/tasks.js |   13 +
 tez-ui/src/main/webapp/app/entities/entity.js   |   71 +-
 tez-ui/src/main/webapp/app/initializers/env.js  |    1 +
 .../src/main/webapp/app/initializers/jquery.js  |    1 -
 tez-ui/src/main/webapp/app/models/abstract.js   |    1 +
 tez-ui/src/main/webapp/app/models/ahs-app.js    |   13 +-
 tez-ui/src/main/webapp/app/models/attempt.js    |   19 +-
 tez-ui/src/main/webapp/app/models/dag.js        |   10 +-
 tez-ui/src/main/webapp/app/models/hive-query.js |   51 +
 tez-ui/src/main/webapp/app/models/task.js       |    3 +
 tez-ui/src/main/webapp/app/models/timed.js      |   43 +
 tez-ui/src/main/webapp/app/models/timeline.js   |   11 +-
 tez-ui/src/main/webapp/app/models/vertex.js     |    7 +
 tez-ui/src/main/webapp/app/router.js            |   16 +-
 .../src/main/webapp/app/routes/application.js   |    4 +-
 tez-ui/src/main/webapp/app/routes/dag/tasks.js  |   20 +
 tez-ui/src/main/webapp/app/routes/dags.js       |  172 --
 tez-ui/src/main/webapp/app/routes/home.js       |   29 +
 tez-ui/src/main/webapp/app/routes/home/index.js |  108 +
 .../src/main/webapp/app/routes/home/queries.js  |  105 +
 .../main/webapp/app/routes/multi-am-pollster.js |    7 +-
 tez-ui/src/main/webapp/app/routes/query.js      |   38 +
 .../src/main/webapp/app/routes/query/configs.js |   38 +
 .../src/main/webapp/app/routes/query/index.js   |   35 +
 .../main/webapp/app/routes/query/timeline.js    |   35 +
 .../main/webapp/app/routes/server-side-ops.js   |  107 +
 .../webapp/app/routes/single-am-pollster.js     |    7 +-
 .../src/main/webapp/app/routes/vertex/tasks.js  |   20 +
 .../src/main/webapp/app/serializers/attempt.js  |   12 +-
 tez-ui/src/main/webapp/app/serializers/dag.js   |    4 +-
 .../main/webapp/app/serializers/hive-query.js   |   85 +-
 tez-ui/src/main/webapp/app/serializers/task.js  |    5 +-
 tez-ui/src/main/webapp/app/styles/app.less      |    2 +
 .../webapp/app/styles/dags-page-search.less     |    4 +-
 .../main/webapp/app/styles/details-page.less    |    6 +
 .../webapp/app/styles/queries-page-search.less  |   78 +
 .../main/webapp/app/styles/query-timeline.less  |  159 ++
 .../webapp/app/styles/zip-download-modal.less   |   13 +-
 .../main/webapp/app/templates/attempt/index.hbs |   16 +
 .../app/templates/components/caller-info.hbs    |    6 +-
 .../templates/components/dags-page-search.hbs   |    8 +
 .../templates/components/dags-pagination-ui.hbs |   48 -
 .../app/templates/components/date-formatter.hbs |    2 +-
 .../components/em-table-tasks-log-link-cell.hbs |   25 +
 .../app/templates/components/em-tooltip.hbs     |   18 +-
 .../app/templates/components/pagination-ui.hbs  |   48 +
 .../components/queries-page-search.hbs          |   88 +
 .../app/templates/components/query-timeline.hbs |  124 +
 .../templates/components/zip-download-modal.hbs |   12 +-
 .../src/main/webapp/app/templates/dag/index.hbs |    6 +-
 tez-ui/src/main/webapp/app/templates/dags.hbs   |   42 -
 tez-ui/src/main/webapp/app/templates/home.hbs   |   20 +
 .../main/webapp/app/templates/home/index.hbs    |   46 +
 .../main/webapp/app/templates/home/queries.hbs  |   41 +
 tez-ui/src/main/webapp/app/templates/query.hbs  |   20 +
 .../main/webapp/app/templates/query/configs.hbs |   34 +
 .../main/webapp/app/templates/query/index.hbs   |  128 +
 .../webapp/app/templates/query/timeline.hbs     |   43 +
 .../main/webapp/app/templates/task/index.hbs    |   25 +
 .../main/webapp/app/templates/vertex/index.hbs  |   11 +
 .../main/webapp/app/utils/download-dag-zip.js   |   75 +-
 .../src/main/webapp/app/utils/vertex-process.js |   13 +-
 .../src/main/webapp/app/utils/virtual-anchor.js |   32 +
 tez-ui/src/main/webapp/package.json             |    6 +-
 .../components/dags-page-search-test.js         |    4 +-
 .../components/dags-pagination-ui-test.js       |  158 --
 .../components/date-formatter-test.js           |   11 +
 .../em-table-tasks-log-link-cell-test.js        |   53 +
 .../components/pagination-ui-test.js            |  158 ++
 .../components/queries-page-search-test.js      |   76 +
 .../components/query-timeline-test.js           |  182 ++
 .../components/zip-download-modal-test.js       |   52 +-
 .../webapp/tests/unit/adapters/timeline-test.js |    5 +
 .../tests/unit/controllers/application-test.js  |   10 +-
 .../tests/unit/controllers/dag/attempts-test.js |    2 +-
 .../unit/controllers/dag/graphical-test.js      |    1 +
 .../tests/unit/controllers/dag/index-test.js    |    1 +
 .../tests/unit/controllers/dag/swimlane-test.js |    1 +
 .../tests/unit/controllers/dag/tasks-test.js    |   28 +
 .../webapp/tests/unit/controllers/dags-test.js  |   52 -
 .../webapp/tests/unit/controllers/home-test.js  |   46 +
 .../tests/unit/controllers/home/index-test.js   |  143 ++
 .../tests/unit/controllers/home/queries-test.js |  148 ++
 .../webapp/tests/unit/controllers/query-test.js |   51 +
 .../unit/controllers/query/configs-test.js      |   55 +
 .../tests/unit/controllers/query/index-test.js  |   33 +
 .../unit/controllers/query/timeline-test.js     |   63 +
 .../webapp/tests/unit/controllers/table-test.js |   39 +
 .../unit/controllers/task/attempts-test.js      |    2 +-
 .../unit/controllers/vertex/attempts-test.js    |    2 +-
 .../tests/unit/controllers/vertex/tasks-test.js |   28 +
 .../webapp/tests/unit/entities/entity-test.js   |  131 +-
 .../tests/unit/initializers/jquery-test.js      |    5 +-
 .../webapp/tests/unit/models/abstract-test.js   |   27 +
 .../webapp/tests/unit/models/ahs-app-test.js    |   15 +-
 .../webapp/tests/unit/models/attempt-test.js    |   31 +
 .../main/webapp/tests/unit/models/dag-test.js   |   37 +
 .../webapp/tests/unit/models/hive-query-test.js |   33 +
 .../main/webapp/tests/unit/models/task-test.js  |    5 +
 .../main/webapp/tests/unit/models/timed-test.js |   86 +
 .../webapp/tests/unit/models/timeline-test.js   |   14 -
 .../webapp/tests/unit/models/vertex-test.js     |   27 +
 .../webapp/tests/unit/routes/dag/tasks-test.js  |   43 +
 .../main/webapp/tests/unit/routes/dags-test.js  |  175 --
 .../main/webapp/tests/unit/routes/home-test.js  |   32 +
 .../webapp/tests/unit/routes/home/index-test.js |  164 ++
 .../tests/unit/routes/home/queries-test.js      |  120 +
 .../tests/unit/routes/multi-am-pollster-test.js |   44 +
 .../main/webapp/tests/unit/routes/query-test.js |   35 +
 .../tests/unit/routes/query/configs-test.js     |   33 +
 .../tests/unit/routes/query/index-test.js       |   35 +
 .../tests/unit/routes/query/timeline-test.js    |   35 +
 .../tests/unit/routes/server-side-ops-test.js   |  176 ++
 .../unit/routes/single-am-pollster-test.js      |   23 +-
 .../tests/unit/routes/vertex/tasks-test.js      |   41 +
 .../tests/unit/serializers/attempt-test.js      |    9 +-
 .../webapp/tests/unit/serializers/dag-test.js   |    2 +
 .../tests/unit/serializers/hive-query-test.js   |   84 +-
 .../webapp/tests/unit/serializers/task-test.js  |    2 +
 .../tests/unit/utils/vertex-process-test.js     |   31 +-
 .../tests/unit/utils/virtual-anchor-test.js     |   40 +
 299 files changed, 9074 insertions(+), 4192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/docs/src/site/site.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --cc tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 51e954d,3bac7b5..173b458
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@@ -95,7 -94,7 +95,11 @@@ public class AMContainerHelpers 
     */
    private static ContainerLaunchContext createCommonContainerLaunchContext(
        Map<ApplicationAccessType, String> applicationACLs,
++<<<<<<< HEAD
 +      Credentials credentials, Map<String, LocalResource> localResources, Configuration
conf) {
++=======
+       Credentials credentials) {
++>>>>>>> master
  
      // Application environment
      Map<String, String> environment = new HashMap<String, String>();

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 3afbb23,9d1f42a..f31425e
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@@ -811,165 -726,125 +811,165 @@@ public class Fetcher extends CallableWi
      }
    }
  
 +  private static class MapOutputStat {
 +    final InputAttemptIdentifier srcAttemptId;
 +    final long decompressedLength;
 +    final long compressedLength;
 +    final int forReduce;
 +
 +    MapOutputStat(InputAttemptIdentifier srcAttemptId, long decompressedLength, long compressedLength,
int forReduce) {
 +      this.srcAttemptId = srcAttemptId;
 +      this.decompressedLength = decompressedLength;
 +      this.compressedLength = compressedLength;
 +      this.forReduce = forReduce;
 +    }
 +
 +    @Override
 +    public String toString() {
 +      return new String("id: " + srcAttemptId + ", decompressed length: " + decompressedLength
+ ", compressed length: " + compressedLength + ", reduce: " + forReduce);
 +    }
 +  }
    private InputAttemptIdentifier[] fetchInputs(DataInputStream input,
 -      CachingCallBack callback) throws FetcherReadTimeoutException {
 +      CachingCallBack callback, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException
{
      FetchedInput fetchedInput = null;
      InputAttemptIdentifier srcAttemptId = null;
 -    long decompressedLength = -1;
 -    long compressedLength = -1;
 -
 +    long decompressedLength = 0;
 +    long compressedLength = 0;
      try {
        long startTime = System.currentTimeMillis();
 -      int responsePartition = -1;
 -      // Read the shuffle header
 -      String pathComponent = null;
 -      try {
 -        ShuffleHeader header = new ShuffleHeader();
 -        header.readFields(input);
 -        pathComponent = header.getMapId();
 -
 -        srcAttemptId = pathToAttemptMap.get(pathComponent);
 -        compressedLength = header.getCompressedLength();
 -        decompressedLength = header.getUncompressedLength();
 -        responsePartition = header.getPartition();
 -      } catch (IllegalArgumentException e) {
 -        // badIdErrs.increment(1);
 -        if (!isShutDown.get()) {
 -          LOG.warn("Invalid src id ", e);
 -          // Don't know which one was bad, so consider all of them as bad
 -          return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]);
 -        } else {
 -          if (isDebugEnabled) {
 -            LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
 -          }
 -          return null;
 -        }
 +      int partitionCount = 1;
 +
 +      if (this.compositeFetch) {
 +        // Multiple partitions are fetched
 +        partitionCount = WritableUtils.readVInt(input);
        }
 +      ArrayList<MapOutputStat> mapOutputStats = new ArrayList<>(partitionCount);
 +      for (int mapOutputIndex = 0; mapOutputIndex < partitionCount; mapOutputIndex++)
{
 +        MapOutputStat mapOutputStat = null;
 +        int responsePartition = -1;
 +        // Read the shuffle header
 +        String pathComponent = null;
 +        try {
 +          ShuffleHeader header = new ShuffleHeader();
 +          header.readFields(input);
 +          pathComponent = header.getMapId();
 +          srcAttemptId = pathToAttemptMap.get(new PathPartition(pathComponent, header.getPartition()));
 +
 +          if (header.getCompressedLength() == 0) {
 +            // Empty partitions are already accounted for
 +            continue;
 +          }
  
 -      // Do some basic sanity verification
 -      if (!verifySanity(compressedLength, decompressedLength,
 -          responsePartition, srcAttemptId, pathComponent)) {
 -        if (!isShutDown.get()) {
 -          if (srcAttemptId == null) {
 -            LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
 -            srcAttemptId = getNextRemainingAttempt();
 +          mapOutputStat = new MapOutputStat(srcAttemptId,
 +              header.getUncompressedLength(),
 +              header.getCompressedLength(),
 +              header.getPartition());
 +          mapOutputStats.add(mapOutputStat);
 +          responsePartition = header.getPartition();
 +        } catch (IllegalArgumentException e) {
 +          // badIdErrs.increment(1);
 +          if (!isShutDown.get()) {
 +            LOG.warn("Invalid src id ", e);
 +            // Don't know which one was bad, so consider all of them as bad
 +            return srcAttemptsRemaining.values().toArray(new InputAttemptIdentifier[srcAttemptsRemaining.size()]);
 +          } else {
 +            if (isDebugEnabled) {
 +              LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
 +            }
 +            return null;
            }
 -          assert (srcAttemptId != null);
 -          return new InputAttemptIdentifier[]{srcAttemptId};
 -        } else {
 -          if (isDebugEnabled) {
 -            LOG.debug("Already shutdown. Ignoring verification failure.");
 +        }
 +
 +        // Do some basic sanity verification
 +        if (!verifySanity(mapOutputStat.compressedLength, mapOutputStat.decompressedLength,
 +            responsePartition, mapOutputStat.srcAttemptId, pathComponent)) {
 +          if (!isShutDown.get()) {
 +            srcAttemptId = mapOutputStat.srcAttemptId;
 +            if (srcAttemptId == null) {
 +              LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
 +              srcAttemptId = getNextRemainingAttempt();
 +            }
 +            assert (srcAttemptId != null);
 +            return new InputAttemptIdentifier[]{srcAttemptId};
 +          } else {
 +            if (isDebugEnabled) {
 +              LOG.debug("Already shutdown. Ignoring verification failure.");
 +            }
 +            return null;
            }
 -          return null;
          }
 -      }
  
 -      if (isDebugEnabled) {
 -        LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength
 -            + ", decomp len: " + decompressedLength);
 -      }
 -      
 -      // TODO TEZ-957. handle IOException here when Broadcast has better error checking
 -      if (srcAttemptId.isShared() && callback != null) {
 -        // force disk if input is being shared
 -        fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength,
 -            compressedLength, srcAttemptId);
 -      } else {
 -        fetchedInput = inputManager.allocate(decompressedLength,
 -            compressedLength, srcAttemptId);
 -      }
 -      // No concept of WAIT at the moment.
 -      // // Check if we can shuffle *now* ...
 -      // if (fetchedInput.getType() == FetchedInput.WAIT) {
 -      // LOG.info("fetcher#" + id +
 -      // " - MergerManager returned Status.WAIT ...");
 -      // //Not an error but wait to process data.
 -      // return EMPTY_ATTEMPT_ID_ARRAY;
 -      // }
 -
 -      // Go!
 -      if (isDebugEnabled) {
 -        LOG.debug("fetcher" + " about to shuffle output of srcAttempt "
 -            + fetchedInput.getInputAttemptIdentifier() + " decomp: "
 -            + decompressedLength + " len: " + compressedLength + " to "
 -            + fetchedInput.getType());
 +        if (isDebugEnabled) {
 +          LOG.debug("header: " + mapOutputStat.srcAttemptId + ", len: " + mapOutputStat.compressedLength
 +              + ", decomp len: " + mapOutputStat.decompressedLength);
 +        }
        }
  
 -      if (fetchedInput.getType() == Type.MEMORY) {
 -        ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
 -          input, (int) decompressedLength, (int) compressedLength, codec,
 -          ifileReadAhead, ifileReadAheadLength, LOG,
 -          fetchedInput.getInputAttemptIdentifier());
 -      } else if (fetchedInput.getType() == Type.DISK) {
 -        ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
 -          (host +":" +port), input, compressedLength, decompressedLength, LOG,
 -          fetchedInput.getInputAttemptIdentifier(),
 -          ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
 -      } else {
 -        throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data
" +
 -            fetchedInput);
 -      }
 +      for (MapOutputStat mapOutputStat : mapOutputStats) {
 +        // Get the location for the map output - either in-memory or on-disk
 +        srcAttemptId = mapOutputStat.srcAttemptId;
 +        decompressedLength = mapOutputStat.decompressedLength;
 +        compressedLength = mapOutputStat.compressedLength;
 +        // TODO TEZ-957. handle IOException here when Broadcast has better error checking
 +        if (srcAttemptId.isShared() && callback != null) {
 +          // force disk if input is being shared
 +          fetchedInput = inputManager.allocateType(Type.DISK, decompressedLength,
 +              compressedLength, srcAttemptId);
 +        } else {
 +          fetchedInput = inputManager.allocate(decompressedLength,
 +              compressedLength, srcAttemptId);
 +        }
 +        // No concept of WAIT at the moment.
 +        // // Check if we can shuffle *now* ...
 +        // if (fetchedInput.getType() == FetchedInput.WAIT) {
 +        // LOG.info("fetcher#" + id +
 +        // " - MergerManager returned Status.WAIT ...");
 +        // //Not an error but wait to process data.
 +        // return EMPTY_ATTEMPT_ID_ARRAY;
 +        // }
 +
 +        // Go!
 +        if (isDebugEnabled) {
 +          LOG.debug("fetcher" + " about to shuffle output of srcAttempt "
 +              + fetchedInput.getInputAttemptIdentifier() + " decomp: "
 +              + decompressedLength + " len: " + compressedLength + " to "
 +              + fetchedInput.getType());
 +        }
  
 -      // offer the fetched input for caching
 -      if (srcAttemptId.isShared() && callback != null) {
 -        // this has to be before the fetchSucceeded, because that goes across
 -        // threads into the reader thread and can potentially shutdown this thread
 -        // while it is still caching.
 -        callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength);
 -      }
 +        if (fetchedInput.getType() == Type.MEMORY) {
 +          ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) fetchedInput).getBytes(),
 +              input, (int) decompressedLength, (int) compressedLength, codec,
 +              ifileReadAhead, ifileReadAheadLength, LOG,
-               fetchedInput.getInputAttemptIdentifier().toString());
++              fetchedInput.getInputAttemptIdentifier());
 +        } else if (fetchedInput.getType() == Type.DISK) {
 +          ShuffleUtils.shuffleToDisk(((DiskFetchedInput) fetchedInput).getOutputStream(),
 +              (host + ":" + port), input, compressedLength, decompressedLength, LOG,
-               fetchedInput.getInputAttemptIdentifier().toString(),
++              fetchedInput.getInputAttemptIdentifier(),
 +              ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
 +        } else {
 +          throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle
data " +
 +              fetchedInput);
 +        }
  
 -      // Inform the shuffle scheduler
 -      long endTime = System.currentTimeMillis();
 -      // Reset retryStartTime as map task make progress if retried before.
 -      retryStartTime = 0;
 -      fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
 -          compressedLength, decompressedLength, (endTime - startTime));
 +        // offer the fetched input for caching
 +        if (srcAttemptId.isShared() && callback != null) {
 +          // this has to be before the fetchSucceeded, because that goes across
 +          // threads into the reader thread and can potentially shutdown this thread
 +          // while it is still caching.
 +          callback.cache(host, srcAttemptId, fetchedInput, compressedLength, decompressedLength);
 +        }
  
 -      // Note successful shuffle
 -      srcAttemptsRemaining.remove(srcAttemptId.toString());
 +        // Inform the shuffle scheduler
 +        long endTime = System.currentTimeMillis();
 +        // Reset retryStartTime as map task make progress if retried before.
 +        retryStartTime = 0;
 +        fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
 +            compressedLength, decompressedLength, (endTime - startTime));
  
 -      // metrics.successFetch();
 -      return null;
 +        // Note successful shuffle
 +        // metrics.successFetch();
 +      }
 +      srcAttemptsRemaining.remove(inputAttemptIdentifier.toString());
      } catch (IOException ioe) {
        if (isShutDown.get()) {
          cleanupFetchedInput(fetchedInput);

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 1d644aa,caddbc8..710466f
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@@ -40,12 -41,14 +41,13 @@@ import com.google.protobuf.ByteString
  
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.io.DataOutputBuffer;
 +import org.apache.tez.dag.api.TezConfiguration;
  import org.apache.tez.http.BaseHttpConnection;
 -import org.apache.tez.http.HttpConnection;
  import org.apache.tez.http.HttpConnectionParams;
 -import org.apache.tez.http.SSLFactory;
 -import org.apache.tez.http.async.netty.AsyncHttpConnection;
  import org.apache.tez.runtime.api.events.DataMovementEvent;
 +import org.apache.tez.runtime.library.common.TezRuntimeUtils;
  import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
+ import org.apache.tez.util.FastNumberFormat;
  import org.roaringbitmap.RoaringBitmap;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index a23ce72,b2ff51d..5661a6d
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@@ -639,13 -626,13 +642,13 @@@ public class ShuffleManager implements 
      
      inputContext.notifyProgress();
      boolean committed = false;
 -    if (!completedInputSet.contains(inputIdentifier)) {
 +    if (!completedInputSet.get(inputIdentifier)) {
        synchronized (completedInputSet) {
 -        if (!completedInputSet.contains(inputIdentifier)) {
 +        if (!completedInputSet.get(inputIdentifier)) {
            fetchedInput.commit();
            committed = true;
-           ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
-               fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
+           fetchStatsLogger.logIndividualFetchComplete(copyDuration, fetchedBytes,
+               decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
  
            // Processing counters for completed and commit fetches only. Need
            // additional counters for excessive fetches - which primarily comes

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 1bfd2a6,58ca1e2..4b7c7fd
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@@ -533,47 -464,69 +533,47 @@@ class FetcherOrderedGrouped extends Cal
            }
            return EMPTY_ATTEMPT_ID_ARRAY;
          }
 -      }
 -      
 -      if(LOG.isDebugEnabled()) {
 -        LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + 
 -            ", decomp len: " + decompressedLength);
 -      }
  
 -      // Get the location for the map output - either in-memory or on-disk
 -      try {
 -        mapOutput = allocator.reserve(srcAttemptId, decompressedLength, compressedLength,
id);
 -      } catch (IOException e) {
 -        if (!stopped) {
 -          // Kill the reduce attempt
 -          ioErrs.increment(1);
 -          scheduler.reportLocalError(e);
 -        } else {
 -          if (LOG.isDebugEnabled()) {
 -            LOG.debug("Already stopped. Ignoring error from merger.reserve");
 -          }
 +        // Check if we can shuffle *now* ...
 +        if (mapOutput.getType() == Type.WAIT) {
 +          LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
 +          //Not an error but wait to process data.
 +          return EMPTY_ATTEMPT_ID_ARRAY;
          }
 -        return EMPTY_ATTEMPT_ID_ARRAY;
 -      }
 -      
 -      // Check if we can shuffle *now* ...
 -      if (mapOutput.getType() == Type.WAIT) {
 -        LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
 -        //Not an error but wait to process data.
 -        return EMPTY_ATTEMPT_ID_ARRAY;
 -      } 
 -      
 -      // Go!
 -      if (LOG.isDebugEnabled()) {
 -        LOG.debug("fetcher#" + id + " about to shuffle output of map " +
 -            mapOutput.getAttemptIdentifier() + " decomp: " +
 -            decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
 -      }
  
 -      if (mapOutput.getType() == Type.MEMORY) {
 -        ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
 -          (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
 -          ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier());
 -      } else if (mapOutput.getType() == Type.DISK) {
 -        ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
 -          input, compressedLength, decompressedLength, LOG,
 -          mapOutput.getAttemptIdentifier(),
 -          ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
 -      } else {
 -        throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
 -            mapOutput.getType());
 -      }
 +        // Go!
 +        if (LOG.isDebugEnabled()) {
 +          LOG.debug("fetcher#" + id + " about to shuffle output of map " +
 +              mapOutput.getAttemptIdentifier() + " decomp: " +
 +              decompressedLength + " len: " + compressedLength + " to " + mapOutput.getType());
 +        }
  
 -      // Inform the shuffle scheduler
 -      long endTime = System.currentTimeMillis();
 -      // Reset retryStartTime as map task make progress if retried before.
 -      retryStartTime = 0;
 +        if (mapOutput.getType() == Type.MEMORY) {
 +          ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input,
 +              (int) decompressedLength, (int) compressedLength, codec, ifileReadAhead,
-               ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
++              ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier());
 +        } else if (mapOutput.getType() == Type.DISK) {
 +          ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(),
 +              input, compressedLength, decompressedLength, LOG,
-               mapOutput.getAttemptIdentifier().toString(),
++              mapOutput.getAttemptIdentifier(),
 +              ifileReadAhead, ifileReadAheadLength, verifyDiskChecksum);
 +        } else {
 +          throw new IOException("Unknown mapOutput type while fetching shuffle data:" +
 +              mapOutput.getType());
 +        }
  
 -      scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
 -                              endTime - startTime, mapOutput, false);
 -      // Note successful shuffle
 -      remaining.remove(srcAttemptId.toString());
 -      metrics.successFetch();
 -      return null;
 -    } catch (IOException ioe) {
 +        // Inform the shuffle scheduler
 +        long endTime = System.currentTimeMillis();
 +        // Reset retryStartTime as map task make progress if retried before.
 +        retryStartTime = 0;
 +
 +        scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength,
 +            endTime - startTime, mapOutput, false);
 +        // Note successful shuffle
 +        metrics.successFetch();
 +      }
 +      remaining.remove(inputAttemptIdentifier.toString());
 +    } catch(IOException ioe) {
        if (stopped) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Not reporting fetch failure for exception during data copy: ["

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/be0d01f6/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------


Mime
View raw message