Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 908DA17B10 for ; Fri, 3 Apr 2015 00:45:20 +0000 (UTC) Received: (qmail 85158 invoked by uid 500); 3 Apr 2015 00:45:20 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 85127 invoked by uid 500); 3 Apr 2015 00:45:20 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 85118 invoked by uid 99); 3 Apr 2015 00:45:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Apr 2015 00:45:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 24DE2E18C7; Fri, 3 Apr 2015 00:45:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rbalamohan@apache.org To: commits@tez.apache.org Message-Id: <9dd3f5b372444eab8e244f36ccc376d6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2251. Race condition in VertexImpl & Edge causes DAG to hang (rbalamohan) Date: Fri, 3 Apr 2015 00:45:20 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master aec3f6b66 -> 212de07db TEZ-2251. Race condition in VertexImpl & Edge causes DAG to hang (rbalamohan) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/212de07d Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/212de07d Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/212de07d Branch: refs/heads/master Commit: 212de07db49d0a0be923dc685e453f111488b99d Parents: aec3f6b Author: Rajesh Balamohan Authored: Fri Apr 3 06:14:48 2015 +0530 Committer: Rajesh Balamohan Committed: Fri Apr 3 06:14:48 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 51 ++++++++++++++++---- 2 files changed, 43 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/212de07d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f092bd0..e844f60 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES TEZ-2176. Move all logging to slf4j. (commons-logging jar no longer part of Tez tar) ALL CHANGES: + TEZ-2251. Race condition in VertexImpl & Edge causes DAG to hang. TEZ-2264. Remove unused taskUmbilical reference in TezTaskRunner, register as running late. TEZ-2149. Optimizations for the timed version of DAGClient.getStatus. TEZ-2213. For the ordered case, enabling pipelined shuffle should automatically disable final merge. http://git-wip-us.apache.org/repos/asf/tez/blob/212de07d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index b254c8a..7f124b4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -1352,7 +1352,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, + parallelism + " for vertex: " + logIdentifier); setVertexLocationHint(vertexLocationHint); writeLock.lock(); - + try { if (parallelismSet == true) { String msg = "Parallelism can only be set dynamically once per vertex: " + logIdentifier; @@ -4186,12 +4186,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, rootInputDescriptorEntry.getKey()).getNumPhysicalInputsForWorkUnit(taskIndex))); } } - for (Entry entry : this.getInputVertices().entrySet()) { - InputSpec inputSpec = entry.getValue().getDestinationSpec(taskIndex); - if (LOG.isDebugEnabled()) { - LOG.debug("For vertex : " + this.getLogIdentifier() - + ", Using InputSpec : " + inputSpec); - } + for(Vertex vertex : getInputVertices().keySet()) { + /** + * It is possible that setParallelism is in the middle of processing in target vertex with + * its write lock. So we need to get inputspec by acquiring read lock in target vertex to + * get consistent view. + * Refer TEZ-2251 + */ + InputSpec inputSpec = ((VertexImpl) vertex).getDestinationSpecFor(this, taskIndex); // TODO DAGAM This should be based on the edge type. inputSpecList.add(inputSpec); } @@ -4204,13 +4206,44 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, List outputSpecList = new ArrayList(this.getOutputVerticesCount() + this.additionalOutputSpecs.size()); outputSpecList.addAll(additionalOutputSpecs); - for (Entry entry : this.getOutputVertices().entrySet()) { - OutputSpec outputSpec = entry.getValue().getSourceSpec(taskIndex); + for(Vertex vertex : targetVertices.keySet()) { + /** + * It is possible that setParallelism (which could change numTasks) is in the middle of + * processing in target vertex with its write lock. So we need to get outputspec by + * acquiring read lock in target vertex to get consistent view. + * Refer TEZ-2251 + */ + OutputSpec outputSpec = ((VertexImpl) vertex).getSourceSpecFor(this, taskIndex); outputSpecList.add(outputSpec); } return outputSpecList; } + private OutputSpec getSourceSpecFor(VertexImpl vertex, int taskIndex) throws + AMUserCodeException { + readLock.lock(); + try { + Edge edge = sourceVertices.get(vertex); + Preconditions.checkState(edge != null, getLogIdentifier()); + return edge.getSourceSpec(taskIndex); + } finally { + readLock.unlock(); + } + } + + private InputSpec getDestinationSpecFor(VertexImpl vertex, int taskIndex) throws + AMUserCodeException { + readLock.lock(); + try { + Edge edge = targetVertices.get(vertex); + Preconditions.checkState(edge != null, getLogIdentifier()); + return edge.getDestinationSpec(taskIndex); + } finally { + readLock.unlock(); + } + } + + //TODO Eventually remove synchronization. @Override public synchronized List getGroupInputSpecList(int taskIndex) {