tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [39/50] [abbrv] tez git commit: TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback (jeagles)
Date Wed, 24 May 2017 21:08:08 GMT
TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e5d01a60
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e5d01a60
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e5d01a60

Branch: refs/heads/master
Commit: e5d01a60a040b7d60272a61cd540753bfe3478be
Parents: ff02c00
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Thu Apr 6 13:06:31 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Thu Apr 6 13:06:31 2017 -0500

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |  1 +
 .../app/launcher/TezContainerLauncherImpl.java  | 22 +++++++++------
 .../tez/auxservices/TestShuffleHandlerJobs.java | 28 ++++++++------------
 .../library/common/shuffle/InputHost.java       |  2 +-
 .../common/shuffle/impl/ShuffleManager.java     | 19 +++++++------
 5 files changed, 36 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e5d01a60/TEZ-3334-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-3334-CHANGES.txt b/TEZ-3334-CHANGES.txt
index 3b44690..61473e0 100644
--- a/TEZ-3334-CHANGES.txt
+++ b/TEZ-3334-CHANGES.txt
@@ -4,6 +4,7 @@ Apache Tez Change Log
 INCOMPATIBLE CHANGES:
 
 ALL CHANGES:
+  TEZ-3684. Incorporate first pass non-essential TEZ-3334 pre-merge feedback
   TEZ-3683. LocalContainerLauncher#shouldDelete member variable is not used
   TEZ-3682. Pass parameters instead of configuration for changes to support tez shuffle handler
   TEZ-3628. Give Tez shuffle handler threads custom names

http://git-wip-us.apache.org/repos/asf/tez/blob/e5d01a60/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
index 058abfe..f6a6874 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezContainerLauncherImpl.java
@@ -177,14 +177,20 @@ public class TezContainerLauncherImpl extends ContainerLauncher {
         this.state = ContainerState.RUNNING;
 
         int shufflePort = TezRuntimeUtils.INVALID_PORT;
-        ByteBuffer portInfo =
-            response.getAllServicesMetaData().get(
-                conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-                    TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
-        if (portInfo != null) {
-          DataInputByteBuffer in = new DataInputByteBuffer();
-          in.reset(portInfo);
-          shufflePort = in.readInt();
+        Map<String, java.nio.ByteBuffer> servicesMetaData = response.getAllServicesMetaData();
+        if (servicesMetaData != null) {
+          String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
+              TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
+          ByteBuffer portInfo = servicesMetaData.get(auxiliaryService);
+          if (portInfo != null) {
+            DataInputByteBuffer in = new DataInputByteBuffer();
+            in.reset(portInfo);
+            shufflePort = in.readInt();
+          } else {
+            LOG.warn("Shuffle port for {} is not present is the services metadata response",
auxiliaryService);
+          }
+        } else {
+          LOG.warn("Shuffle port cannot be found since services metadata response is missing");
         }
 
         deletionTracker.addNodeShufflePorts(event.getNodeId(), shufflePort);

http://git-wip-us.apache.org/repos/asf/tez/blob/e5d01a60/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
index c409bf8..f719c13 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandlerJobs.java
@@ -20,17 +20,11 @@ package org.apache.tez.auxservices;
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
-    .GetApplicationReportRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -56,7 +50,7 @@ public class TestShuffleHandlerJobs {
 
   private static final Logger LOG = LoggerFactory.getLogger(TestShuffleHandlerJobs.class);
 
-  protected static MiniTezCluster mrrTezCluster;
+  protected static MiniTezCluster tezCluster;
   protected static MiniDFSCluster dfsCluster;
 
   private static Configuration conf = new Configuration();
@@ -81,8 +75,8 @@ public class TestShuffleHandlerJobs {
       return;
     }
 
-    if (mrrTezCluster == null) {
-      mrrTezCluster = new MiniTezCluster(TestShuffleHandlerJobs.class.getName(), NUM_NMS,
+    if (tezCluster == null) {
+      tezCluster = new MiniTezCluster(TestShuffleHandlerJobs.class.getName(), NUM_NMS,
           1, 1);
       Configuration conf = new Configuration();
       conf.set(YarnConfiguration.NM_AUX_SERVICES,
@@ -94,17 +88,17 @@ public class TestShuffleHandlerJobs {
       conf.set("fs.defaultFS", remoteFs.getUri().toString());   // use HDFS
       conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/apps_staging_dir");
       conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
-      mrrTezCluster.init(conf);
-      mrrTezCluster.start();
+      tezCluster.init(conf);
+      tezCluster.start();
     }
 
   }
 
   @AfterClass
   public static void tearDown() {
-    if (mrrTezCluster != null) {
-      mrrTezCluster.stop();
-      mrrTezCluster = null;
+    if (tezCluster != null) {
+      tezCluster.stop();
+      tezCluster = null;
     }
     if (dfsCluster != null) {
       dfsCluster.shutdown();
@@ -123,7 +117,7 @@ public class TestShuffleHandlerJobs {
     String outputDirStr = "/tmp/owc-output/";
     Path outputDir = new Path(outputDirStr);
 
-    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    TezConfiguration tezConf = new TezConfiguration(tezCluster.getConfig());
     tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString());
     tezConf.set(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID, ShuffleHandler.TEZ_SHUFFLE_SERVICEID);
     tezConf.setBoolean(TezConfiguration.TEZ_AM_DAG_DELETE_ENABLED, true);
@@ -138,7 +132,7 @@ public class TestShuffleHandlerJobs {
               inputDirStr, outputDirStr, "10"}, tezSession)==0);
       verifyOutput(outputDir, remoteFs);
       tezSession.stop();
-      ClientRMService rmService = mrrTezCluster.getResourceManager().getClientRMService();
+      ClientRMService rmService = tezCluster.getResourceManager().getClientRMService();
       boolean isAppComplete = false;
       while(!isAppComplete) {
         GetApplicationReportResponse resp = rmService.getApplicationReport(
@@ -158,7 +152,7 @@ public class TestShuffleHandlerJobs {
         Thread.sleep(100);
       }
       for(int i = 0; i < NUM_NMS; i++) {
-        String appPath = mrrTezCluster.getTestWorkDir() + "/" + this.getClass().getName()
+        String appPath = tezCluster.getTestWorkDir() + "/" + this.getClass().getName()
             + "-localDir-nm-" + i + "_0/usercache/" + UserGroupInformation.getCurrentUser().getUserName()
             + "/appcache/" + job.getAppId();
         String dagPathStr = appPath + "/dag_1";

http://git-wip-us.apache.org/repos/asf/tez/blob/e5d01a60/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
index 88dacb9..6014b84 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
@@ -106,7 +106,7 @@ public class InputHost extends HostPort {
     inputs.add(srcAttempt);
   }
 
-  public synchronized PartitionToInputs clearAndGetOnePartition() {
+  public synchronized PartitionToInputs clearAndGetOnePartitionRange() {
     for (Map.Entry<PartitionRange, BlockingQueue<InputAttemptIdentifier>> entry
:
         partitionToInputs.entrySet()) {
       List<InputAttemptIdentifier> inputs =

http://git-wip-us.apache.org/repos/asf/tez/blob/e5d01a60/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index a23ce72..3436fc7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -63,7 +63,6 @@ import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
@@ -431,11 +430,11 @@ public class ShuffleManager implements FetcherCallback {
 
     // Remove obsolete inputs from the list being given to the fetcher. Also
     // remove from the obsolete list.
-    PartitionToInputs pendingInputsOfOnePartition = inputHost
-        .clearAndGetOnePartition();
+    PartitionToInputs pendingInputsOfOnePartitionRange = inputHost
+        .clearAndGetOnePartitionRange();
     int includedMaps = 0;
     for (Iterator<InputAttemptIdentifier> inputIter =
-        pendingInputsOfOnePartition.getInputs().iterator();
+        pendingInputsOfOnePartitionRange.getInputs().iterator();
             inputIter.hasNext();) {
       InputAttemptIdentifier input = inputIter.next();
 
@@ -467,8 +466,8 @@ public class ShuffleManager implements FetcherCallback {
       if (includedMaps >= maxTaskOutputAtOnce) {
         inputIter.remove();
         //add to inputHost
-        inputHost.addKnownInput(pendingInputsOfOnePartition.getPartition(), pendingInputsOfOnePartition.getPartitionCount(),
-            input);
+        inputHost.addKnownInput(pendingInputsOfOnePartitionRange.getPartition(),
+            pendingInputsOfOnePartitionRange.getPartitionCount(), input);
       } else {
         includedMaps++;
       }
@@ -477,13 +476,13 @@ public class ShuffleManager implements FetcherCallback {
       pendingHosts.add(inputHost); //add it to queue
     }
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
-        pendingInputsOfOnePartition.getPartition(),
-        pendingInputsOfOnePartition.getPartitionCount(),
-            pendingInputsOfOnePartition.getInputs());
+        pendingInputsOfOnePartitionRange.getPartition(),
+        pendingInputsOfOnePartitionRange.getPartitionCount(),
+            pendingInputsOfOnePartitionRange.getInputs());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created Fetcher for host: " + inputHost.getHost()
           + ", info: " + inputHost.getAdditionalInfo()
-          + ", with inputs: " + pendingInputsOfOnePartition);
+          + ", with inputs: " + pendingInputsOfOnePartitionRange);
     }
     return fetcherBuilder.build();
   }


Mime
View raw message