tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject [1/2] TEZ-1267. Exception handling for VertexManager (zjffdu)
Date Wed, 22 Oct 2014 23:50:12 GMT
Repository: tez
Updated Branches:
  refs/heads/master e530a46c3 -> b4674b485


http://git-wip-us.apache.org/repos/asf/tez/blob/b4674b48/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index 38d5998..44b557b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -41,18 +41,29 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalOutput;
@@ -68,7 +79,9 @@ import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.Writer;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.test.TestAMRecovery.DoNothingProcessor;
 import org.junit.Test;
 
 public class TestExceptionPropagation {
@@ -168,7 +181,6 @@ public class TestExceptionPropagation {
     tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
     tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
-
     tezClient = TezClient.create("TestExceptionPropagation", tezConf);
     tezClient.start();
   }
@@ -280,16 +292,21 @@ public class TestExceptionPropagation {
     // PROCESSOR_HANDLE_EVENTS
     PROCESSOR_RUN_ERROR, PROCESSOR_CLOSE_ERROR, PROCESSOR_INITIALIZE_ERROR,
     PROCESSOR_RUN_EXCEPTION, PROCESSOR_CLOSE_EXCEPTION, PROCESSOR_INITIALIZE_EXCEPTION,
+
+    // VM
+    VM_INITIALIZE, VM_ON_ROOTVERTEX_INITIALIZE,VM_ON_SOURCETASK_COMPLETED, VM_ON_VERTEX_STARTED,
+    VM_ON_VERTEXMANAGEREVENT_RECEIVED,
   }
 
   /**
-   * create a DAG with single vertex, set payload on Input/Output/Processor to
+   * create a DAG with 2 vertices (v1 --> v2), set payload on Input/Output/Processor/VertexManagerPlugin
to
    * control where throw exception
    * 
    * @param exLocation
    * @return
+   * @throws IOException
    */
-  private DAG createDAG(ExceptionLocation exLocation) {
+  private DAG createDAG(ExceptionLocation exLocation) throws IOException {
     DAG dag = DAG.create("dag_" + exLocation.name());
     UserPayload payload =
         UserPayload.create(ByteBuffer.wrap(exLocation.name().getBytes()));
@@ -302,7 +319,17 @@ public class TestExceptionPropagation {
         DataSourceDescriptor.create(inputDesc, iiDesc, null));
     OutputDescriptor outputDesc = OutputWithException.getOutputDesc(payload);
     v1.addDataSink("output", DataSinkDescriptor.create(outputDesc, null, null));
-    dag.addVertex(v1);
+    v1.setVertexManagerPlugin(RootInputVertexManagerWithException.getVMDesc(payload));
+
+    Vertex v2 = 
+        Vertex.create("v2", DoNothingProcessor.getProcDesc(), 1);
+    v2.setVertexManagerPlugin(ShuffleVertexManagerWithException.getVMDesc(exLocation));
+
+    dag.addVertex(v1)
+      .addVertex(v2)
+      .addEdge(Edge.create(v1, v2, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+        OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload))));
     return dag;
   }
 
@@ -438,8 +465,14 @@ public class TestExceptionPropagation {
     public List<Event> close() throws Exception {
       if (this.exLocation == ExceptionLocation.OUTPUT_CLOSE) {
         throw new RuntimeException(this.exLocation.name());
+      } else if (this.exLocation == ExceptionLocation.VM_ON_VERTEXMANAGEREVENT_RECEIVED)
{
+        // send VertexManagerEvent to v2
+        List<Event> events = new ArrayList<Event>();
+        events.add(VertexManagerEvent.create("v2", ByteBuffer.wrap(new byte[0])));
+        return events;
+      } else {
+        return null;
       }
-      return null;
     }
 
     @Override
@@ -513,4 +546,92 @@ public class TestExceptionPropagation {
           .setUserPayload(payload);
     }
   }
+
+  public static class RootInputVertexManagerWithException extends RootInputVertexManager
{
+
+    private ExceptionLocation exLocation;
+
+    public RootInputVertexManagerWithException(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() {
+      super.initialize();
+      this.exLocation =
+          ExceptionLocation.valueOf(new String(getContext().getUserPayload()
+              .deepCopyAsArray()));
+      if (this.exLocation == ExceptionLocation.VM_INITIALIZE) {
+        throw new RuntimeException(this.exLocation.name());
+      }
+    }
+
+    @Override
+    public void onRootVertexInitialized(String inputName,
+        InputDescriptor inputDescriptor, List<Event> events) {
+      if (this.exLocation == ExceptionLocation.VM_ON_ROOTVERTEX_INITIALIZE) {
+        throw new RuntimeException(this.exLocation.name());
+      }
+      super.onRootVertexInitialized(inputName, inputDescriptor, events);
+    }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+      if (this.exLocation == ExceptionLocation.VM_ON_VERTEX_STARTED) {
+        throw new RuntimeException(this.exLocation.name());
+      }
+      super.onVertexStarted(completions);
+    }
+
+    public static VertexManagerPluginDescriptor getVMDesc(UserPayload payload) {
+      return VertexManagerPluginDescriptor.create(RootInputVertexManagerWithException.class.getName())
+              .setUserPayload(payload);
+    }
+  }
+
+  public static class ShuffleVertexManagerWithException extends ShuffleVertexManager {
+
+    private ExceptionLocation exLocation;
+    private static final String Test_ExceptionLocation = "Test.ExceptionLocation";
+
+    public ShuffleVertexManagerWithException(VertexManagerPluginContext context) {
+      super(context);
+    }
+
+    @Override
+    public void initialize() {
+      super.initialize();
+      Configuration conf;
+      try {
+        conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
+        this.exLocation = ExceptionLocation.valueOf(conf.get(Test_ExceptionLocation));
+      } catch (IOException e) {
+        throw new TezUncheckedException(e);
+      }
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+      if (this.exLocation == ExceptionLocation.VM_ON_SOURCETASK_COMPLETED) {
+        throw new RuntimeException(this.exLocation.name());
+      }
+      super.onSourceTaskCompleted(srcVertexName, attemptId);
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+      if (this.exLocation == ExceptionLocation.VM_ON_VERTEXMANAGEREVENT_RECEIVED) {
+        throw new RuntimeException(this.exLocation.name());
+      }
+      super.onVertexManagerEventReceived(vmEvent);
+    }
+
+    public static VertexManagerPluginDescriptor getVMDesc(ExceptionLocation exLocation) throws
IOException {
+      Configuration conf = new Configuration();
+      conf.set(Test_ExceptionLocation, exLocation.name());
+      UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
+      return VertexManagerPluginDescriptor.create(ShuffleVertexManagerWithException.class.getName())
+              .setUserPayload(payload);
+    }
+  }
 }


Mime
View raw message