Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 31FB817CB9 for ; Tue, 21 Oct 2014 21:39:23 +0000 (UTC) Received: (qmail 80432 invoked by uid 500); 21 Oct 2014 21:39:23 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 80397 invoked by uid 500); 21 Oct 2014 21:39:23 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 80387 invoked by uid 99); 21 Oct 2014 21:39:23 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Oct 2014 21:39:23 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A3D7793BFFF; Tue, 21 Oct 2014 21:39:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Message-Id: <42b47369cbef4f678a79f42cf162608c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: TEZ-1668. InputInitializers should be able to register for Vertex state updates in the constructor. (sseth) Date: Tue, 21 Oct 2014 21:39:22 +0000 (UTC) Repository: tez Updated Branches: refs/heads/master 99f2b8ee9 -> 63648fba2 TEZ-1668. InputInitializers should be able to register for Vertex state updates in the constructor. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/63648fba Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/63648fba Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/63648fba Branch: refs/heads/master Commit: 63648fba216fc8e1caff629a023c73c2b8584aa3 Parents: 99f2b8e Author: Siddharth Seth Authored: Tue Oct 21 14:36:01 2014 -0700 Committer: Siddharth Seth Committed: Tue Oct 21 14:36:01 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../app/dag/RootInputInitializerManager.java | 29 +++++++++++++++++++- .../java/org/apache/tez/test/TestTezJobs.java | 4 +-- 3 files changed, 31 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/63648fba/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9f7a738..73a7cd3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -46,6 +46,7 @@ ALL CHANGES: TEZ-1525. BroadcastLoadGen testcase. TEZ-1686. TestRecoveryParser.testGetLastCompletedDAG fails sometimes TEZ-1667. Add a system test for InitializerEvents. + TEZ-1668. InputInitializers should be able to register for Vertex state updates in the constructor. Release 0.5.1: 2014-10-02 http://git-wip-us.apache.org/repos/asf/tez/blob/63648fba/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java index 87d4eb6..1f7a83f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java @@ -111,6 +111,15 @@ public class RootInputInitializerManager { InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, context, vertex, entityStateTracker, appContext); + + // Register pending vertex update registrations + List vertexUpdateRegistrations = pendingVertexRegistrations.removeAll(input.getName()); + if (vertexUpdateRegistrations != null) { + for (VertexUpdateRegistrationHolder h : vertexUpdateRegistrations) { + initializerWrapper.registerForVertexStateUpdates(h.vertexName, h.stateSet); + } + } + initializerMap.put(input.getName(), initializerWrapper); ListenableFuture> future = executor .submit(new InputInitializerCallable(initializerWrapper, dagUgi)); @@ -164,12 +173,30 @@ public class RootInputInitializerManager { } } + private static class VertexUpdateRegistrationHolder { + private VertexUpdateRegistrationHolder(String vertexName, Set stateSet) { + this.vertexName = vertexName; + this.stateSet = stateSet; + } + private final String vertexName; + private final Set stateSet; + } + + // This doesn't need to be thread safe, since initializers are not created in separate threads, + // they're only executed in separate threads. + private final ListMultimap pendingVertexRegistrations = + LinkedListMultimap.create(); + public void registerForVertexUpdates(String vertexName, String inputName, @Nullable Set stateSet) { Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName); Preconditions.checkNotNull(inputName, "InputName cannot be null"); InitializerWrapper initializer = initializerMap.get(inputName); - initializer.registerForVertexStateUpdates(vertexName, stateSet); + if (initializer == null) { + pendingVertexRegistrations.put(inputName, new VertexUpdateRegistrationHolder(vertexName, stateSet)); + } else { + initializer.registerForVertexStateUpdates(vertexName, stateSet); + } } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/tez/blob/63648fba/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index 80110b8..d1a5f8e 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -625,12 +625,12 @@ public class TestTezJobs { public InputInitializerForTest( InputInitializerContext initializerContext) { super(initializerContext); + getContext().registerForVertexStateUpdates(EVENT_GENERATING_VERTEX_NAME, EnumSet.of( + VertexState.SUCCEEDED)); } @Override public List initialize() throws Exception { - getContext().registerForVertexStateUpdates(EVENT_GENERATING_VERTEX_NAME, EnumSet.of( - VertexState.SUCCEEDED)); lock.lock(); try { condition.await();