tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [06/23] git commit: TEZ-1045. TezMiniCluster tests can fail intermittently (bikas)
Date Fri, 20 Jun 2014 22:35:44 GMT
TEZ-1045. TezMiniCluster tests can fail intermittently (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7423ed27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7423ed27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7423ed27

Branch: refs/heads/branch-0.4.1-incubating
Commit: 7423ed27ede110caf818d006973708504d75da21
Parents: 8a592a6
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Apr 11 12:02:48 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Jun 20 15:34:45 2014 -0700

----------------------------------------------------------------------
 .../library/common/shuffle/impl/Fetcher.java      |  4 ++--
 .../library/common/shuffle/impl/MapHost.java      | 14 +++++++-------
 .../shuffle/impl/ShuffleInputEventHandler.java    |  3 ++-
 .../common/shuffle/impl/ShuffleScheduler.java     | 18 ++++++++++--------
 .../runtime/library/shuffle/common/InputHost.java | 10 ++++++++++
 .../shuffle/common/impl/ShuffleManager.java       | 14 ++++++++------
 .../java/org/apache/tez/test/MiniTezCluster.java  |  2 ++
 7 files changed, 41 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7423ed27/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index ac2c5b7..9462d95 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -472,7 +472,7 @@ class Fetcher extends Thread {
       }
       
       LOG.warn("Failed to shuffle output of " + srcAttemptId + 
-               " from " + host.getHostName(), ioe); 
+               " from " + host.getHostIdentifier(), ioe); 
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
@@ -668,7 +668,7 @@ class Fetcher extends Thread {
     if (bytesLeft != 0) {
       throw new IOException("Incomplete map output received for " +
                             mapOutput.getAttemptIdentifier() + " from " +
-                            host.getHostName() + " (" + 
+                            host.getHostIdentifier() + " (" + 
                             bytesLeft + " bytes missing of " + 
                             compressedLength + ")"
       );

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7423ed27/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java
index b8be657..aa7309a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java
@@ -34,18 +34,18 @@ class MapHost {
   }
   
   private State state = State.IDLE;
-  private final String hostName;
+  private final String hostIdentifier;
   private final int partitionId;
   private final String baseUrl;
   private final String identifier;
   // Tracks attempt IDs
   private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
   
-  public MapHost(int partitionId, String hostName, String baseUrl) {
+  public MapHost(int partitionId, String hostPort, String baseUrl) {
     this.partitionId = partitionId;
-    this.hostName = hostName;
+    this.hostIdentifier = hostPort;
     this.baseUrl = baseUrl;
-    this.identifier = createIdentifier(hostName, partitionId);
+    this.identifier = createIdentifier(hostPort, partitionId);
   }
   
   public static String createIdentifier(String hostName, int partitionId) {
@@ -64,8 +64,8 @@ class MapHost {
     return state;
   }
 
-  public String getHostName() {
-    return hostName;
+  public String getHostIdentifier() {
+    return hostIdentifier;
   }
 
   public String getBaseUrl() {
@@ -112,7 +112,7 @@ class MapHost {
   
   @Override
   public String toString() {
-    return hostName;
+    return hostIdentifier;
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7423ed27/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
index f11575b..cfad1fc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -99,7 +99,8 @@ public class ShuffleInputEventHandler {
                 "the empty partition to succeeded", e);
       }
     }
-    scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(),
srcAttemptIdentifier);
+    scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(), 
+        partitionId, baseUri.toString(), srcAttemptIdentifier);
   }
   
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7423ed27/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
index 993489e..c2bc903 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java
@@ -163,7 +163,7 @@ class ShuffleScheduler {
                                          ) throws IOException {
     failureCounts.remove(srcAttemptIdentifier);
     if (host != null) {
-      hostFailures.remove(host.getHostName());
+      hostFailures.remove(host.getHostIdentifier());
     }
     
     if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) {
@@ -231,14 +231,14 @@ class ShuffleScheduler {
     } else {
       failureCounts.put(srcAttempt, new IntWritable(1));      
     }
-    String hostname = host.getHostName();
+    String hostPort = host.getHostIdentifier();
     // TODO TEZ-922 hostFailures isn't really used for anything. Factor it into error
     // reporting / potential blacklisting of hosts.
-    if (hostFailures.containsKey(hostname)) {
-      IntWritable x = hostFailures.get(hostname);
+    if (hostFailures.containsKey(hostPort)) {
+      IntWritable x = hostFailures.get(hostPort);
       x.set(x.get() + 1);
     } else {
-      hostFailures.put(hostname, new IntWritable(1));
+      hostFailures.put(hostPort, new IntWritable(1));
     }
     if (failures >= abortFailureLimit) {
       // This task has seen too many fetch failures - report it as failed. The
@@ -348,14 +348,16 @@ class ShuffleScheduler {
 
   }
   
-  public synchronized void addKnownMapOutput(String hostName,
+  public synchronized void addKnownMapOutput(String inputHostName,
+                                             int port,
                                              int partitionId,
                                              String hostUrl,
                                              InputAttemptIdentifier srcAttempt) {
-    String identifier = MapHost.createIdentifier(hostName, partitionId);
+    String hostPort = (inputHostName + ":" + String.valueOf(port));
+    String identifier = MapHost.createIdentifier(hostPort, partitionId);
     MapHost host = mapLocations.get(identifier);
     if (host == null) {
-      host = new MapHost(partitionId, hostName, hostUrl);
+      host = new MapHost(partitionId, hostPort, hostUrl);
       assert identifier.equals(host.getIdentifier());
       mapLocations.put(identifier, host);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7423ed27/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
index 59da655..15e45af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java
@@ -37,13 +37,19 @@ public class InputHost {
   private final String host;
   private final int port;
   private final int srcPhysicalIndex;
+  private final String identifier;
 
   private final BlockingQueue<InputAttemptIdentifier> inputs = new LinkedBlockingQueue<InputAttemptIdentifier>();
 
+  public static String createIdentifier(String host, int port) {
+    return (host + ":" + String.valueOf(port));
+  }
+  
   public InputHost(String hostName, int port, ApplicationId appId, int srcPhysicalIndex)
{
     this.host = hostName;
     this.port = port;
     this.srcPhysicalIndex = srcPhysicalIndex;
+    this.identifier = createIdentifier(hostName, port);
   }
 
   public String getHost() {
@@ -53,6 +59,10 @@ public class InputHost {
   public int getPort() {
     return this.port;
   }
+  
+  public String getIdentifier() {
+    return this.identifier;
+  }
 
   public int getSrcPhysicalIndex() {
     return this.srcPhysicalIndex;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7423ed27/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
index bde9bfe..050c0c0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java
@@ -263,7 +263,7 @@ public class ShuffleManager implements FetcherCallback {
                 LOG.debug("Processing pending host: " + inputHost.toDetailedString());
               }
               if (inputHost.getNumPendingInputs() > 0) {
-                LOG.info("Scheduling fetch for inputHost: " + inputHost.getHost());
+                LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier());
                 Fetcher fetcher = constructFetcherForHost(inputHost);
                 numRunningFetchers.incrementAndGet();
                 if (isShutdown.get()) {
@@ -277,7 +277,7 @@ public class ShuffleManager implements FetcherCallback {
                 }
               } else {
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Skipping host: " + inputHost.getHost()
+                  LOG.debug("Skipping host: " + inputHost.getIdentifier()
                       + " since it has no inputs to process");
                 }
               }
@@ -336,10 +336,12 @@ public class ShuffleManager implements FetcherCallback {
   
   public void addKnownInput(String hostName, int port,
       InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
-    InputHost host = knownSrcHosts.get(hostName);
+    String identifier = InputHost.createIdentifier(hostName, port);
+    InputHost host = knownSrcHosts.get(identifier);
     if (host == null) {
       host = new InputHost(hostName, port, inputContext.getApplicationId(), srcPhysicalIndex);
-      InputHost old = knownSrcHosts.putIfAbsent(hostName, host);
+      assert identifier.equals(host.getIdentifier());
+      InputHost old = knownSrcHosts.putIfAbsent(identifier, host);
       if (old != null) {
         host = old;
       }
@@ -352,7 +354,7 @@ public class ShuffleManager implements FetcherCallback {
     try {
       boolean added = pendingHosts.offer(host);
       if (!added) {
-        String errorMessage = "Unable to add host: " + host.getHost() + " to pending queue";
+        String errorMessage = "Unable to add host: " + host.getIdentifier() + " to pending
queue";
         LOG.error(errorMessage);
         throw new TezUncheckedException(errorMessage);
       }
@@ -665,7 +667,7 @@ public class ShuffleManager implements FetcherCallback {
     public void onSuccess(FetchResult result) {
       Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
       if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
-        InputHost inputHost = knownSrcHosts.get(result.getHost());
+        InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(),
result.getPort()));
         assert inputHost != null;
         for (InputAttemptIdentifier input : pendingInputs) {
           inputHost.addKnownInput(input);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7423ed27/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
index 62553a4..4803e27 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java
@@ -76,6 +76,8 @@ public class MiniTezCluster extends MiniYARNCluster {
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
+    // blacklisting disabled to prevent scheduling issues
+    conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
     if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
       conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
           "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());


Mime
View raw message