tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-1667. Add a system test for InitializerEvents. (sseth)
Date Tue, 21 Oct 2014 21:32:59 GMT
Repository: tez
Updated Branches:
  refs/heads/master 244f12f66 -> 99f2b8ee9


TEZ-1667. Add a system test for InitializerEvents. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/99f2b8ee
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/99f2b8ee
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/99f2b8ee

Branch: refs/heads/master
Commit: 99f2b8ee94cd9ed551fa380ed0411b4cc69f6d77
Parents: 244f12f
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Oct 21 14:32:44 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Oct 21 14:32:44 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../api/events/InputInitializerEvent.java       |   6 +
 .../java/org/apache/tez/test/TestTezJobs.java   | 138 +++++++++++++++++++
 3 files changed, 145 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/99f2b8ee/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c951e8e..9f7a738 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,6 +45,7 @@ ALL CHANGES:
   TEZ-1584. Restore counters from DAGFinishedEvent when DAG is completed.
   TEZ-1525. BroadcastLoadGen testcase.
   TEZ-1686. TestRecoveryParser.testGetLastCompletedDAG fails sometimes
+  TEZ-1667. Add a system test for InitializerEvents.
 
 Release 0.5.1: 2014-10-02
 

http://git-wip-us.apache.org/repos/asf/tez/blob/99f2b8ee/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
index 8360447..3037e61 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
@@ -111,4 +111,10 @@ public class InputInitializerEvent extends Event {
   public String getSourceVertexName() {
     return this.sourceVertexName;
   }
+
+  @Override
+  public String toString() {
+    return "[TargetVertexName=" + targetVertexName + ", TargetInputName=" + targetInputName
+
+        ", SourceVertexName=" + sourceVertexName + " Payload=" + eventPayload + "]";
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/99f2b8ee/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index 484ca7e..80110b8 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -28,10 +28,17 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -46,15 +53,31 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.examples.OrderedWordCount;
 import org.apache.tez.examples.SimpleSessionExample;
 import org.apache.tez.examples.JoinDataGen;
 import org.apache.tez.examples.HashJoinExample;
 import org.apache.tez.examples.JoinValidate;
 import org.apache.tez.examples.SortMergeJoinExample;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.InputInitializer;
+import org.apache.tez.runtime.api.InputInitializerContext;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.library.processor.SimpleProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.test.dag.MultiAttemptDAG;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -536,4 +559,119 @@ public class TestTezJobs {
       }
     }
   }
+
+  @Test(timeout = 60000)
+  public void testInputInitializerEvents() throws TezException, InterruptedException, IOException
{
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    TezClient tezClient = TezClient.create("TestInputInitializerEvents", tezConf);
+    tezClient.start();
+
+    try {
+      DAG dag = DAG.create("TestInputInitializerEvents");
+      Vertex vertex1 = Vertex.create(VERTEX_WITH_INITIALIZER_NAME, ProcessorDescriptor.create(
+          SleepProcessor.class.getName())
+          .setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload()), 1)
+          .addDataSource(INPUT1_NAME,
+              DataSourceDescriptor
+                  .create(InputDescriptor.create(MultiAttemptDAG.NoOpInput.class.getName()),
+                      InputInitializerDescriptor.create(InputInitializerForTest.class.getName()),
+                      null));
+      Vertex vertex2 = Vertex.create(EVENT_GENERATING_VERTEX_NAME,
+          ProcessorDescriptor.create(InputInitializerEventGeneratingProcessor.class.getName()),
5);
+
+      dag.addVertex(vertex1).addVertex(vertex2);
+
+      DAGClient dagClient = tezClient.submitDAG(dag);
+      dagClient.waitForCompletion();
+      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    } finally {
+      tezClient.stop();
+    }
+  }
+
+  private static final String VERTEX_WITH_INITIALIZER_NAME = "VertexWithInitializer";
+  private static final String EVENT_GENERATING_VERTEX_NAME = "EventGeneratingVertex";
+  private static final String INPUT1_NAME = "Input1";
+
+  public static class InputInitializerEventGeneratingProcessor extends SimpleProcessor {
+
+    public InputInitializerEventGeneratingProcessor(
+        ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      if (getContext().getTaskIndex() == 1 && getContext().getTaskAttemptNumber()
== 0) {
+        throw new IOException("Failing task 2, attempt 0");
+      }
+
+      InputInitializerEvent initializerEvent = InputInitializerEvent.create(
+          VERTEX_WITH_INITIALIZER_NAME, INPUT1_NAME,
+          ByteBuffer.allocate(4).putInt(0, getContext().getTaskIndex()));
+      List<Event> events = Lists.newArrayList();
+      events.add(initializerEvent);
+      getContext().sendEvents(events);
+    }
+  }
+
+  public static class InputInitializerForTest extends InputInitializer {
+
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition condition = lock.newCondition();
+    private final BitSet eventsSeen = new BitSet();
+
+    public InputInitializerForTest(
+        InputInitializerContext initializerContext) {
+      super(initializerContext);
+    }
+
+    @Override
+    public List<Event> initialize() throws Exception {
+      getContext().registerForVertexStateUpdates(EVENT_GENERATING_VERTEX_NAME, EnumSet.of(
+          VertexState.SUCCEEDED));
+      lock.lock();
+      try {
+        condition.await();
+      } finally {
+        lock.unlock();
+      }
+      return null;
+    }
+
+
+    @Override
+    public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws
Exception {
+      lock.lock();
+      try {
+        for (InputInitializerEvent event : events) {
+          Preconditions.checkArgument(
+              event.getSourceVertexName().equals(EVENT_GENERATING_VERTEX_NAME));
+          int index = event.getUserPayload().getInt(0);
+          Preconditions.checkState(!eventsSeen.get(index));
+          eventsSeen.set(index);
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+      lock.lock();
+      try {
+        Preconditions.checkArgument(stateUpdate.getVertexState() == VertexState.SUCCEEDED);
+        if (eventsSeen.cardinality() ==
+            getContext().getVertexNumTasks(EVENT_GENERATING_VERTEX_NAME)) {
+          condition.signal();
+        } else {
+          throw new IllegalStateException(
+              "Received VertexState SUCCEEDED before receiving all InputInitializerEvents");
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
 }


Mime
View raw message