tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [43/50] [abbrv] tez git commit: Merge remote-tracking branch 'origin/TEZ-3334' into TEZ-3334-MERGE1
Date Wed, 24 May 2017 21:08:12 GMT
Merge remote-tracking branch 'origin/TEZ-3334' into TEZ-3334-MERGE1


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e1a9c282
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e1a9c282
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e1a9c282

Branch: refs/heads/master
Commit: e1a9c2822bb0d3e33cc5e8bc127991380d4ab54f
Parents: 54f7784 b81592a
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Thu May 4 15:52:23 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Thu May 4 15:52:23 2017 -0500

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |   5 +
 .../apache/tez/dag/api/TezConfiguration.java    |  10 +-
 .../tez/dag/app/launcher/DagDeleteRunnable.java |  10 +-
 .../tez/dag/app/launcher/DeletionTracker.java   |  12 +-
 .../dag/app/launcher/DeletionTrackerImpl.java   |  31 ++--
 .../app/launcher/LocalContainerLauncher.java    |  30 ++--
 .../app/launcher/TezContainerLauncherImpl.java  |  51 ++++---
 .../app/rm/container/AMContainerHelpers.java    |  10 +-
 tez-plugins/tez-aux-services/pom.xml            |   4 -
 .../apache/tez/auxservices/ShuffleHandler.java  |   8 +-
 .../tez/auxservices/TestShuffleHandlerJobs.java |  32 ++--
 .../runtime/library/common/TezRuntimeUtils.java |   6 +-
 .../library/common/shuffle/InputHost.java       |   2 +-
 .../library/common/shuffle/ShuffleUtils.java    |  13 +-
 .../common/shuffle/impl/ShuffleManager.java     | 146 ++++++++-----------
 .../common/shuffle/orderedgrouped/Shuffle.java  |   3 +-
 .../ShuffleInputEventHandlerOrderedGrouped.java |   4 +-
 .../orderedgrouped/ShuffleScheduler.java        |   2 +-
 .../common/sort/impl/PipelinedSorter.java       |   9 +-
 .../common/sort/impl/dflt/DefaultSorter.java    |   6 +-
 .../output/OrderedPartitionedKVOutput.java      |   4 +-
 .../common/shuffle/TestShuffleUtils.java        |  13 +-
 ...tShuffleInputEventHandlerOrderedGrouped.java |   3 +-
 .../common/sort/impl/TestPipelinedSorter.java   |  10 +-
 .../TestUnorderedPartitionedKVWriter.java       |  21 ++-
 25 files changed, 227 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --cc tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 1e23c8f,ba3ecad..31a5d92
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@@ -160,8 -159,10 +158,10 @@@ public class AMContainerHelpers 
      ContainerLaunchContext commonContainerSpec = null;
      synchronized (commonContainerSpecLock) {
        if (!commonContainerSpecs.containsKey(tezDAGID)) {
+         String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+             TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
          commonContainerSpec =
-             createCommonContainerLaunchContext(acls, credentials, conf);
 -            createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, auxiliaryService);
++            createCommonContainerLaunchContext(acls, credentials, localResources, auxiliaryService);
          commonContainerSpecs.put(tezDAGID, commonContainerSpec);
        } else {
          commonContainerSpec = commonContainerSpecs.get(tezDAGID);

http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --cc tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 5661a6d,57cf4d0..bc3ca0e
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@@ -636,60 -623,40 +626,40 @@@ public class ShuffleManager implements 
      lock.lock();
      try {
        lastProgressTime = System.currentTimeMillis();
-     } finally {
-       lock.unlock();
-     }
-     
-     inputContext.notifyProgress();
-     boolean committed = false;
-     if (!completedInputSet.get(inputIdentifier)) {
-       synchronized (completedInputSet) {
-         if (!completedInputSet.get(inputIdentifier)) {
-           fetchedInput.commit();
-           committed = true;
-           fetchStatsLogger.logIndividualFetchComplete(copyDuration, fetchedBytes,
-               decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
- 
-           // Processing counters for completed and commit fetches only. Need
-           // additional counters for excessive fetches - which primarily comes
-           // in after speculation or retries.
-           shuffledInputsCounter.increment(1);
-           bytesShuffledCounter.increment(fetchedBytes);
-           if (fetchedInput.getType() == Type.MEMORY) {
-             bytesShuffledToMemCounter.increment(fetchedBytes);
-           } else if (fetchedInput.getType() == Type.DISK) {
-             bytesShuffledToDiskCounter.increment(fetchedBytes);
-           } else if (fetchedInput.getType() == Type.DISK_DIRECT) {
-             bytesShuffledDirectDiskCounter.increment(fetchedBytes);
-           }
-           decompressedDataSizeCounter.increment(decompressedLength);
- 
-           if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
-             registerCompletedInput(fetchedInput);
-           } else {
-             registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
-           }
+       inputContext.notifyProgress();
+       if (!completedInputSet.get(inputIdentifier)) {
+         fetchedInput.commit();
 -        ShuffleUtils.logIndividualFetchComplete(LOG, copyDuration,
++        fetchStatsLogger.logIndividualFetchComplete(copyDuration,
+             fetchedBytes, decompressedLength, fetchedInput.getType().toString(), srcAttemptIdentifier);
+ 
+         // Processing counters for completed and commit fetches only. Need
+         // additional counters for excessive fetches - which primarily comes
+         // in after speculation or retries.
+         shuffledInputsCounter.increment(1);
+         bytesShuffledCounter.increment(fetchedBytes);
+         if (fetchedInput.getType() == Type.MEMORY) {
+           bytesShuffledToMemCounter.increment(fetchedBytes);
+         } else if (fetchedInput.getType() == Type.DISK) {
+           bytesShuffledToDiskCounter.increment(fetchedBytes);
+         } else if (fetchedInput.getType() == Type.DISK_DIRECT) {
+           bytesShuffledDirectDiskCounter.increment(fetchedBytes);
+         }
+         decompressedDataSizeCounter.increment(decompressedLength);
  
-           lock.lock();
-           try {
-             totalBytesShuffledTillNow += fetchedBytes;
-             logProgress();
-           } finally {
-             lock.unlock();
-           }
+         if (!srcAttemptIdentifier.canRetrieveInputInChunks()) {
+           registerCompletedInput(fetchedInput);
+         } else {
+           registerCompletedInputForPipelinedShuffle(srcAttemptIdentifier, fetchedInput);
          }
-       }
-     }
-     if (!committed) {
-       fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
-     } else {
-       lock.lock();
-       try {
-         // Signal the wakeLoop to check for termination.
+ 
+         totalBytesShuffledTillNow += fetchedBytes;
+         logProgress();
          wakeLoop.signal();
-       } finally {
-         lock.unlock();
+       } else {
+         fetchedInput.abort(); // If this fails, the fetcher may attempt another abort.
        }
+     } finally {
+       lock.unlock();
      }
      // TODO NEWTEZ Maybe inform fetchers, in case they have an alternate attempt of the
same task in their queue.
    }

http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tez/blob/e1a9c282/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestPipelinedSorter.java
----------------------------------------------------------------------


Mime
View raw message