tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1668. InputInitializers should be able to register for Vertex state updates in the constructor. (sseth) (cherry picked from commit 63648fba216fc8e1caff629a023c73c2b8584aa3)
Date Tue, 21 Oct 2014 21:40:09 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 8a84fd97b -> 92a586228


TEZ-1668. InputInitializers should be able to register for Vertex state
updates in the constructor. (sseth)
(cherry picked from commit 63648fba216fc8e1caff629a023c73c2b8584aa3)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/92a58622
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/92a58622
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/92a58622

Branch: refs/heads/branch-0.5
Commit: 92a586228d90356ed84054313cce396063adade2
Parents: 8a84fd9
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Oct 21 14:36:01 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Oct 21 14:39:57 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../app/dag/RootInputInitializerManager.java    | 29 +++++++++++++++++++-
 .../java/org/apache/tez/test/TestTezJobs.java   |  4 +--
 3 files changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/92a58622/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 753cb3e..df6940d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -39,6 +39,7 @@ ALL CHANGES:
   TEZ-1525. BroadcastLoadGen testcase.
   TEZ-1686. TestRecoveryParser.testGetLastCompletedDAG fails sometimes
   TEZ-1667. Add a system test for InitializerEvents.
+  TEZ-1668. InputInitializers should be able to register for Vertex state updates in the
constructor.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/92a58622/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index 87d4eb6..1f7a83f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -111,6 +111,15 @@ public class RootInputInitializerManager {
 
       InitializerWrapper initializerWrapper =
           new InitializerWrapper(input, initializer, context, vertex, entityStateTracker,
appContext);
+
+      // Register pending vertex update registrations
+      List<VertexUpdateRegistrationHolder> vertexUpdateRegistrations = pendingVertexRegistrations.removeAll(input.getName());
+      if (vertexUpdateRegistrations != null) {
+        for (VertexUpdateRegistrationHolder h : vertexUpdateRegistrations) {
+          initializerWrapper.registerForVertexStateUpdates(h.vertexName, h.stateSet);
+        }
+      }
+
       initializerMap.put(input.getName(), initializerWrapper);
       ListenableFuture<List<Event>> future = executor
           .submit(new InputInitializerCallable(initializerWrapper, dagUgi));
@@ -164,12 +173,30 @@ public class RootInputInitializerManager {
     }
   }
 
+  private static class VertexUpdateRegistrationHolder {
+    private VertexUpdateRegistrationHolder(String vertexName, Set<org.apache.tez.dag.api.event.VertexState>
stateSet) {
+      this.vertexName = vertexName;
+      this.stateSet = stateSet;
+    }
+    private final String vertexName;
+    private final Set<org.apache.tez.dag.api.event.VertexState> stateSet;
+  }
+
+  // This doesn't need to be thread safe, since initializers are not created in separate
threads,
+  // they're only executed in separate threads.
+  private final ListMultimap<String, VertexUpdateRegistrationHolder> pendingVertexRegistrations
=
+      LinkedListMultimap.create();
+
   public void registerForVertexUpdates(String vertexName, String inputName,
                                        @Nullable Set<org.apache.tez.dag.api.event.VertexState>
stateSet) {
     Preconditions.checkNotNull(vertexName, "VertexName cannot be null: " + vertexName);
     Preconditions.checkNotNull(inputName, "InputName cannot be null");
     InitializerWrapper initializer = initializerMap.get(inputName);
-    initializer.registerForVertexStateUpdates(vertexName, stateSet);
+    if (initializer == null) {
+      pendingVertexRegistrations.put(inputName, new VertexUpdateRegistrationHolder(vertexName,
stateSet));
+    } else {
+      initializer.registerForVertexStateUpdates(vertexName, stateSet);
+    }
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/tez/blob/92a58622/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 80110b8..d1a5f8e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -625,12 +625,12 @@ public class TestTezJobs {
     public InputInitializerForTest(
         InputInitializerContext initializerContext) {
       super(initializerContext);
+      getContext().registerForVertexStateUpdates(EVENT_GENERATING_VERTEX_NAME, EnumSet.of(
+          VertexState.SUCCEEDED));
     }
 
     @Override
     public List<Event> initialize() throws Exception {
-      getContext().registerForVertexStateUpdates(EVENT_GENERATING_VERTEX_NAME, EnumSet.of(
-          VertexState.SUCCEEDED));
       lock.lock();
       try {
         condition.await();


Mime
View raw message