Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C7FF1111AE for ; Thu, 20 Feb 2014 23:13:52 +0000 (UTC) Received: (qmail 58858 invoked by uid 500); 20 Feb 2014 23:13:52 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 58818 invoked by uid 500); 20 Feb 2014 23:13:50 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 58811 invoked by uid 99); 20 Feb 2014 23:13:50 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Feb 2014 23:13:50 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 20 Feb 2014 23:13:49 +0000 Received: (qmail 57300 invoked by uid 99); 20 Feb 2014 23:13:24 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Feb 2014 23:13:24 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3C9BE92B123; Thu, 20 Feb 2014 23:13:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-863. Queue events for relevant inputs untill the Input has been started. Fixes a potential NPE in case of no auto start. (sseth) Date: Thu, 20 Feb 2014 23:13:24 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-tez Updated Branches: refs/heads/master be0d38988 -> 7e1d11010 TEZ-863. Queue events for relevant inputs untill the Input has been started. Fixes a potential NPE in case of no auto start. (sseth) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7e1d1101 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7e1d1101 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7e1d1101 Branch: refs/heads/master Commit: 7e1d11010802050182835f3cc7a3b86619ee2cb1 Parents: be0d389 Author: Siddharth Seth Authored: Thu Feb 20 15:12:21 2014 -0800 Committer: Siddharth Seth Committed: Thu Feb 20 15:12:21 2014 -0800 ---------------------------------------------------------------------- .../library/input/ShuffledMergedInput.java | 34 +++++++++++++++++--- .../library/input/ShuffledUnorderedKVInput.java | 31 ++++++++++++++++-- 2 files changed, 58 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7e1d1101/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java index 35c1ab5..d70924a 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java @@ -19,7 +19,10 @@ package org.apache.tez.runtime.library.input; import java.io.IOException; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -60,6 +63,9 @@ public class ShuffledMergedInput implements LogicalInput { protected Configuration conf; protected int numInputs = 0; protected Shuffle shuffle; + private final BlockingQueue pendingEvents = new LinkedBlockingQueue(); + private volatile long firstEventReceivedTime = -1; + // ZZZ LOG THIS TIME @SuppressWarnings("rawtypes") protected ValuesIterator vIter; @@ -88,9 +94,18 @@ public class ShuffledMergedInput implements LogicalInput { @Override public void start() throws IOException { - if (!isStarted.getAndSet(true)) { - // Start the shuffle - copy and merge - shuffle.run(); + synchronized (this) { + if (!isStarted.getAndSet(true)) { + // Start the shuffle - copy and merge + shuffle.run(); + List pending = new LinkedList(); + pendingEvents.drainTo(pending); + if (pending.size() > 0) { + LOG.info("NoAutoStart delay in processing first event: " + + (System.currentTimeMillis() - firstEventReceivedTime)); + shuffle.handleEvents(pending); + } + } } } @@ -123,7 +138,7 @@ public class ShuffledMergedInput implements LogicalInput { @Override public List close() throws IOException { - if (this.numInputs != 0) { + if (this.numInputs != 0 && rawIter != null) { rawIter.close(); } return Collections.emptyList(); @@ -192,6 +207,17 @@ public class ShuffledMergedInput implements LogicalInput { if (numInputs == 0) { throw new RuntimeException("No input events expected as numInputs is 0"); } + if (!isStarted.get()) { + synchronized (this) { + if (!isStarted.get()) { + if (firstEventReceivedTime == -1) { + firstEventReceivedTime = System.currentTimeMillis(); + } + pendingEvents.addAll(inputEvents); + return; + } + } + } shuffle.handleEvents(inputEvents); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7e1d1101/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java index 9123345..f00ef3d 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java @@ -20,7 +20,10 @@ package org.apache.tez.runtime.library.input; import java.io.IOException; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -43,6 +46,8 @@ public class ShuffledUnorderedKVInput implements LogicalInput { private Configuration conf; private int numInputs = -1; private BroadcastShuffleManager shuffleManager; + private final BlockingQueue pendingEvents = new LinkedBlockingQueue(); + private volatile long firstEventReceivedTime = -1; @SuppressWarnings("rawtypes") private BroadcastKVReader kvReader; @@ -68,9 +73,18 @@ public class ShuffledUnorderedKVInput implements LogicalInput { @Override public void start() throws IOException { - if (!isStarted.getAndSet(true)) { - this.shuffleManager.run(); - this.kvReader = this.shuffleManager.createReader(); + synchronized (this) { + if (!isStarted.getAndSet(true)) { + this.shuffleManager.run(); + this.kvReader = this.shuffleManager.createReader(); + List pending = new LinkedList(); + pendingEvents.drainTo(pending); + if (pending.size() > 0) { + LOG.info("NoAutoStart delay in processing first event: " + + (System.currentTimeMillis() - firstEventReceivedTime)); + shuffleManager.handleEvents(pending); + } + } } } @@ -102,6 +116,17 @@ public class ShuffledUnorderedKVInput implements LogicalInput { if (numInputs == 0) { throw new RuntimeException("No input events expected as numInputs is 0"); } + if (!isStarted.get()) { + synchronized(this) { + if (!isStarted.get()) { + if (firstEventReceivedTime == -1) { + firstEventReceivedTime = System.currentTimeMillis(); + } + pendingEvents.addAll(inputEvents); + return; + } + } + } shuffleManager.handleEvents(inputEvents); }