Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A8DBD10AF1 for ; Fri, 11 Apr 2014 19:03:15 +0000 (UTC) Received: (qmail 88737 invoked by uid 500); 11 Apr 2014 19:03:14 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 88697 invoked by uid 500); 11 Apr 2014 19:03:14 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 88690 invoked by uid 99); 11 Apr 2014 19:03:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Apr 2014 19:03:12 +0000 X-ASF-Spam-Status: No, hits=-2000.3 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD,T_FILL_THIS_FORM_SHORT X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 11 Apr 2014 19:03:11 +0000 Received: (qmail 87865 invoked by uid 99); 11 Apr 2014 19:02:50 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Apr 2014 19:02:50 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id AE64598B2E3; Fri, 11 Apr 2014 19:02:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bikas@apache.org To: commits@tez.incubator.apache.org Message-Id: <31853d0bd1ea4531a1493443088ccfc8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1045. TezMiniCluster tests can fail intermittently (bikas) Date: Fri, 11 Apr 2014 19:02:50 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-tez Updated Branches: refs/heads/master 0ecf1d136 -> a786e41a3 TEZ-1045. TezMiniCluster tests can fail intermittently (bikas) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/a786e41a Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/a786e41a Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/a786e41a Branch: refs/heads/master Commit: a786e41a3c0a164c649ef4922ebbc5a5f3d59201 Parents: 0ecf1d1 Author: Bikas Saha Authored: Fri Apr 11 12:02:48 2014 -0700 Committer: Bikas Saha Committed: Fri Apr 11 12:02:48 2014 -0700 ---------------------------------------------------------------------- .../library/common/shuffle/impl/Fetcher.java | 4 ++-- .../library/common/shuffle/impl/MapHost.java | 14 +++++++------- .../shuffle/impl/ShuffleInputEventHandler.java | 3 ++- .../common/shuffle/impl/ShuffleScheduler.java | 18 ++++++++++-------- .../runtime/library/shuffle/common/InputHost.java | 10 ++++++++++ .../shuffle/common/impl/ShuffleManager.java | 14 ++++++++------ .../java/org/apache/tez/test/MiniTezCluster.java | 2 ++ 7 files changed, 41 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a786e41a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java index ac2c5b7..9462d95 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java @@ -472,7 +472,7 @@ class Fetcher extends Thread { } LOG.warn("Failed to shuffle output of " + srcAttemptId + - " from " + host.getHostName(), ioe); + " from " + host.getHostIdentifier(), ioe); // Inform the shuffle-scheduler mapOutput.abort(); @@ -668,7 +668,7 @@ class Fetcher extends Thread { if (bytesLeft != 0) { throw new IOException("Incomplete map output received for " + mapOutput.getAttemptIdentifier() + " from " + - host.getHostName() + " (" + + host.getHostIdentifier() + " (" + bytesLeft + " bytes missing of " + compressedLength + ")" ); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a786e41a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java index b8be657..aa7309a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/MapHost.java @@ -34,18 +34,18 @@ class MapHost { } private State state = State.IDLE; - private final String hostName; + private final String hostIdentifier; private final int partitionId; private final String baseUrl; private final String identifier; // Tracks attempt IDs private List maps = new ArrayList(); - public MapHost(int partitionId, String hostName, String baseUrl) { + public MapHost(int partitionId, String hostPort, String baseUrl) { this.partitionId = partitionId; - this.hostName = hostName; + this.hostIdentifier = hostPort; this.baseUrl = baseUrl; - this.identifier = createIdentifier(hostName, partitionId); + this.identifier = createIdentifier(hostPort, partitionId); } public static String createIdentifier(String hostName, int partitionId) { @@ -64,8 +64,8 @@ class MapHost { return state; } - public String getHostName() { - return hostName; + public String getHostIdentifier() { + return hostIdentifier; } public String getBaseUrl() { @@ -112,7 +112,7 @@ class MapHost { @Override public String toString() { - return hostName; + return hostIdentifier; } /** http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a786e41a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java index f11575b..cfad1fc 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleInputEventHandler.java @@ -99,7 +99,8 @@ public class ShuffleInputEventHandler { "the empty partition to succeeded", e); } } - scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier); + scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(), + partitionId, baseUri.toString(), srcAttemptIdentifier); } private void processTaskFailedEvent(InputFailedEvent ifEvent) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a786e41a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java index 993489e..c2bc903 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleScheduler.java @@ -163,7 +163,7 @@ class ShuffleScheduler { ) throws IOException { failureCounts.remove(srcAttemptIdentifier); if (host != null) { - hostFailures.remove(host.getHostName()); + hostFailures.remove(host.getHostIdentifier()); } if (!isInputFinished(srcAttemptIdentifier.getInputIdentifier().getInputIndex())) { @@ -231,14 +231,14 @@ class ShuffleScheduler { } else { failureCounts.put(srcAttempt, new IntWritable(1)); } - String hostname = host.getHostName(); + String hostPort = host.getHostIdentifier(); // TODO TEZ-922 hostFailures isn't really used for anything. Factor it into error // reporting / potential blacklisting of hosts. - if (hostFailures.containsKey(hostname)) { - IntWritable x = hostFailures.get(hostname); + if (hostFailures.containsKey(hostPort)) { + IntWritable x = hostFailures.get(hostPort); x.set(x.get() + 1); } else { - hostFailures.put(hostname, new IntWritable(1)); + hostFailures.put(hostPort, new IntWritable(1)); } if (failures >= abortFailureLimit) { // This task has seen too many fetch failures - report it as failed. The @@ -348,14 +348,16 @@ class ShuffleScheduler { } - public synchronized void addKnownMapOutput(String hostName, + public synchronized void addKnownMapOutput(String inputHostName, + int port, int partitionId, String hostUrl, InputAttemptIdentifier srcAttempt) { - String identifier = MapHost.createIdentifier(hostName, partitionId); + String hostPort = (inputHostName + ":" + String.valueOf(port)); + String identifier = MapHost.createIdentifier(hostPort, partitionId); MapHost host = mapLocations.get(identifier); if (host == null) { - host = new MapHost(partitionId, hostName, hostUrl); + host = new MapHost(partitionId, hostPort, hostUrl); assert identifier.equals(host.getIdentifier()); mapLocations.put(identifier, host); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a786e41a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java index 59da655..15e45af 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/InputHost.java @@ -37,13 +37,19 @@ public class InputHost { private final String host; private final int port; private final int srcPhysicalIndex; + private final String identifier; private final BlockingQueue inputs = new LinkedBlockingQueue(); + public static String createIdentifier(String host, int port) { + return (host + ":" + String.valueOf(port)); + } + public InputHost(String hostName, int port, ApplicationId appId, int srcPhysicalIndex) { this.host = hostName; this.port = port; this.srcPhysicalIndex = srcPhysicalIndex; + this.identifier = createIdentifier(hostName, port); } public String getHost() { @@ -53,6 +59,10 @@ public class InputHost { public int getPort() { return this.port; } + + public String getIdentifier() { + return this.identifier; + } public int getSrcPhysicalIndex() { return this.srcPhysicalIndex; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a786e41a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java index bde9bfe..050c0c0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/shuffle/common/impl/ShuffleManager.java @@ -263,7 +263,7 @@ public class ShuffleManager implements FetcherCallback { LOG.debug("Processing pending host: " + inputHost.toDetailedString()); } if (inputHost.getNumPendingInputs() > 0) { - LOG.info("Scheduling fetch for inputHost: " + inputHost.getHost()); + LOG.info("Scheduling fetch for inputHost: " + inputHost.getIdentifier()); Fetcher fetcher = constructFetcherForHost(inputHost); numRunningFetchers.incrementAndGet(); if (isShutdown.get()) { @@ -277,7 +277,7 @@ public class ShuffleManager implements FetcherCallback { } } else { if (LOG.isDebugEnabled()) { - LOG.debug("Skipping host: " + inputHost.getHost() + LOG.debug("Skipping host: " + inputHost.getIdentifier() + " since it has no inputs to process"); } } @@ -336,10 +336,12 @@ public class ShuffleManager implements FetcherCallback { public void addKnownInput(String hostName, int port, InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) { - InputHost host = knownSrcHosts.get(hostName); + String identifier = InputHost.createIdentifier(hostName, port); + InputHost host = knownSrcHosts.get(identifier); if (host == null) { host = new InputHost(hostName, port, inputContext.getApplicationId(), srcPhysicalIndex); - InputHost old = knownSrcHosts.putIfAbsent(hostName, host); + assert identifier.equals(host.getIdentifier()); + InputHost old = knownSrcHosts.putIfAbsent(identifier, host); if (old != null) { host = old; } @@ -352,7 +354,7 @@ public class ShuffleManager implements FetcherCallback { try { boolean added = pendingHosts.offer(host); if (!added) { - String errorMessage = "Unable to add host: " + host.getHost() + " to pending queue"; + String errorMessage = "Unable to add host: " + host.getIdentifier() + " to pending queue"; LOG.error(errorMessage); throw new TezUncheckedException(errorMessage); } @@ -665,7 +667,7 @@ public class ShuffleManager implements FetcherCallback { public void onSuccess(FetchResult result) { Iterable pendingInputs = result.getPendingInputs(); if (pendingInputs != null && pendingInputs.iterator().hasNext()) { - InputHost inputHost = knownSrcHosts.get(result.getHost()); + InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(), result.getPort())); assert inputHost != null; for (InputAttemptIdentifier input : pendingInputs) { inputHost.addKnownInput(input); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/a786e41a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java index 62553a4..4803e27 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java +++ b/tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java @@ -76,6 +76,8 @@ public class MiniTezCluster extends MiniYARNCluster { @Override public void serviceInit(Configuration conf) throws Exception { conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME); + // blacklisting disabled to prevent scheduling issues + conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false); if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) { conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());