tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [17/20] TEZ-443. Merge tez-dag-api and tez-engine-api into a single module - tez-api (part of TEZ-398). (sseth)
Date Mon, 23 Sep 2013 17:45:31 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
new file mode 100644
index 0000000..dae5625
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api.client.rpc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
+import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
+import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
+
+import com.google.protobuf.ServiceException;
+
+public class DAGClientRPCImpl implements DAGClient {
+  private static final Log LOG = LogFactory.getLog(DAGClientRPCImpl.class);
+
+  private final ApplicationId appId;
+  private final String dagId;
+  private final TezConfiguration conf;
+  private ApplicationReport appReport;
+  private YarnClient yarnClient;
+  private DAGClientAMProtocolBlockingPB proxy = null;
+
+  public DAGClientRPCImpl(ApplicationId appId, String dagId,
+      TezConfiguration conf) {
+    this.appId = appId;
+    this.dagId = dagId;
+    this.conf = conf;
+    yarnClient = new YarnClientImpl();
+    yarnClient.init(new YarnConfiguration(conf));
+    yarnClient.start();
+    appReport = null;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    return appId;
+  }
+
+  @Override
+  public DAGStatus getDAGStatus() throws IOException, TezException {
+    if(createAMProxyIfNeeded()) {
+      try {
+        return getDAGStatusViaAM();
+      } catch (TezException e) {
+        resetProxy(e); // create proxy again
+      }
+    }
+
+    // Later maybe from History
+    return getDAGStatusViaRM();
+  }
+
+  @Override
+  public VertexStatus getVertexStatus(String vertexName)
+                                    throws IOException, TezException {
+    if(createAMProxyIfNeeded()) {
+      try {
+        return getVertexStatusViaAM(vertexName);
+      } catch (TezException e) {
+        resetProxy(e); // create proxy again
+      }
+    }
+
+    // need AM for this. Later maybe from History
+    return null;
+  }
+
+  @Override
+  public void tryKillDAG() throws TezException, IOException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
+    }
+    if(createAMProxyIfNeeded()) {
+      TryKillDAGRequestProto requestProto =
+          TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
+      try {
+        proxy.tryKillDAG(null, requestProto);
+      } catch (ServiceException e) {
+        resetProxy(e);
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+    if(yarnClient != null) {
+      yarnClient.stop();
+    }
+  }
+
+  @Override
+  public ApplicationReport getApplicationReport() {
+    return appReport;
+  }
+
+  void resetProxy(Exception e) {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Resetting AM proxy for app: " + appId + " dag:" + dagId +
+          " due to exception :", e);
+    }
+    proxy = null;
+  }
+
+  DAGStatus getDAGStatusViaAM() throws IOException, TezException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
+    }
+    GetDAGStatusRequestProto requestProto =
+        GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
+    try {
+      return new DAGStatus(
+                 proxy.getDAGStatus(null, requestProto).getDagStatus());
+    } catch (ServiceException e) {
+      // TEZ-151 retrieve wrapped TezException
+      throw new TezException(e);
+    }
+  }
+
+
+
+  DAGStatus getDAGStatusViaRM() throws TezException, IOException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
+    }
+    ApplicationReport appReport;
+    try {
+      appReport = yarnClient.getApplicationReport(appId);
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+
+    if(appReport == null) {
+      throw new TezException("Unknown/Invalid appId: " + appId);
+    }
+
+    DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
+    DAGStatus dagStatus = new DAGStatus(builder);
+    DAGStatusStateProto dagState = null;
+    switch (appReport.getYarnApplicationState()) {
+    case NEW:
+    case NEW_SAVING:
+    case SUBMITTED:
+    case ACCEPTED:
+      dagState = DAGStatusStateProto.DAG_SUBMITTED;
+      break;
+    case RUNNING:
+      dagState = DAGStatusStateProto.DAG_RUNNING;
+      break;
+    case FAILED:
+      dagState = DAGStatusStateProto.DAG_FAILED;
+      break;
+    case KILLED:
+      dagState = DAGStatusStateProto.DAG_KILLED;
+      break;
+    case FINISHED:
+      switch(appReport.getFinalApplicationStatus()) {
+      case UNDEFINED:
+      case FAILED:
+        dagState = DAGStatusStateProto.DAG_FAILED;
+        break;
+      case KILLED:
+        dagState = DAGStatusStateProto.DAG_KILLED;
+        break;
+      case SUCCEEDED:
+        dagState = DAGStatusStateProto.DAG_SUCCEEDED;
+        break;
+      default:
+        throw new TezUncheckedException("Encountered unknown final application"
+          + " status from YARN"
+          + ", appState=" + appReport.getYarnApplicationState()
+          + ", finalStatus=" + appReport.getFinalApplicationStatus());
+      }
+      break;
+    default:
+      throw new TezUncheckedException("Encountered unknown application state"
+          + " from YARN, appState=" + appReport.getYarnApplicationState());
+    }
+
+    builder.setState(dagState);
+    if(appReport.getDiagnostics() != null) {
+      builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
+    }
+
+    return dagStatus;
+  }
+
+  VertexStatus getVertexStatusViaAM(String vertexName) throws TezException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
+          + " vertex: " + vertexName);
+    }
+    GetVertexStatusRequestProto requestProto =
+        GetVertexStatusRequestProto.newBuilder().
+                        setDagId(dagId).setVertexName(vertexName).build();
+
+    try {
+      return new VertexStatus(
+                 proxy.getVertexStatus(null, requestProto).getVertexStatus());
+    } catch (ServiceException e) {
+      // TEZ-151 retrieve wrapped TezException
+      throw new TezException(e);
+    }
+  }
+
+  ApplicationReport getAppReport() throws IOException, TezException {
+    try {
+      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("App: " + appId + " in state: "
+            + appReport.getYarnApplicationState());
+      }
+      return appReport;
+    } catch (YarnException e) {
+      throw new TezException(e);
+    }
+  }
+
+  boolean createAMProxyIfNeeded() throws IOException, TezException {
+    if(proxy != null) {
+      // if proxy exist optimistically use it assuming there is no retry
+      return true;
+    }
+    appReport = getAppReport();
+
+    if(appReport == null) {
+      return false;
+    }
+    YarnApplicationState appState = appReport.getYarnApplicationState();
+    if(appState != YarnApplicationState.RUNNING) {
+      return false;
+    }
+
+    // YARN-808. Cannot ascertain if AM is ready until we connect to it.
+    // workaround check the default string set by YARN
+    if(appReport.getHost() == null || appReport.getHost().equals("N/A") ||
+        appReport.getRpcPort() == 0){
+      // attempt not running
+      return false;
+    }
+
+    InetSocketAddress addr = new InetSocketAddress(appReport.getHost(),
+        appReport.getRpcPort());
+
+    RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
+        ProtobufRpcEngine.class);
+    proxy = (DAGClientAMProtocolBlockingPB) RPC.getProxy(
+        DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Event.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Event.java b/tez-api/src/main/java/org/apache/tez/engine/api/Event.java
new file mode 100644
index 0000000..80da655
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Event.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * Base class for all events generated within the Tez execution engine.
+ * Used as the primary mode of communication between the AM, Inputs, Processors
+ * and Outputs.
+ */
+public abstract class Event {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Input.java b/tez-api/src/main/java/org/apache/tez/engine/api/Input.java
new file mode 100644
index 0000000..e333075
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Input.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.util.List;
+
+/**
+ * Represents an input through which a TezProcessor receives data on an edge.
+ * </p>
+ *
+ * <code>Input</code> classes must have a 0 argument public constructor for Tez
+ * to construct the <code>Input</code>. Tez will take care of initializing and
+ * closing the Input after a {@link Processor} completes. </p>
+ */
+public interface Input {
+
+  /**
+   * Initializes the <code>Input</code>.
+   *
+   * @param inputContext
+   *          the {@link TezInputContext}
+   * @return
+   * @throws Exception
+   *           if an error occurs
+   */
+  public List<Event> initialize(TezInputContext inputContext)
+      throws Exception;
+
+  /**
+   * Gets an instance of the {@link Reader} for this <code>Output</code>
+   *
+   * @return
+   * @throws Exception
+   *           if an error occurs
+   */
+  public Reader getReader() throws Exception;
+
+  /**
+   * Handles user and system generated {@link Events}s, which typically carry
+   * information such as an output being available on the previous vertex.
+   *
+   * @param inputEvents
+   *          the list of {@link Event}s
+   */
+  public void handleEvents(List<Event> inputEvents);
+
+  /**
+   * Closes the <code>Input</code>
+   *
+   * @return
+   * @throws Exception
+   *           if an error occurs
+   */
+  public List<Event> close() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/LogicalIOProcessor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/LogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalIOProcessor.java
new file mode 100644
index 0000000..90be09e
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalIOProcessor.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.util.Map;
+
+/**
+ * Represents a processor which consumes {@link LogicalInput}s and produces
+ * {@link LogicalOutput}s
+ */
+public interface LogicalIOProcessor extends Processor {
+
+  /**
+   * Runs the {@link LogicalProcessor}
+   * 
+   * @param inputs
+   *          a map of the source vertex name to {@link LogicalInput} - one per
+   *          incoming edge.
+   * @param outputs
+   *          a map of the destination vertex name to {@link LogicalOutput} -
+   *          one per outgoing edge
+   * @throws Exception TODO
+   */
+  public void run(Map<String, LogicalInput> inputs,
+      Map<String, LogicalOutput> outputs) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/LogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/LogicalInput.java b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalInput.java
new file mode 100644
index 0000000..4a47ccf
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalInput.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * An @link {@link Input} which handles all incoming physical connections on an
+ * edge. A {@link LogicalIOProcessor} sees a single Logical Input per incoming
+ * edge.
+ */
+public interface LogicalInput extends Input {
+
+  /**
+   * Sets the number of physical inputs that this <code>LogicalInput</code> will
+   * receive. This will be called by the Tez framework before initializing the
+   * <code>LogicalInput</code>
+   * 
+   * @param numInputs
+   *          the number of physical inputs.
+   */
+  public void setNumPhysicalInputs(int numInputs);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/LogicalOutput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/LogicalOutput.java b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalOutput.java
new file mode 100644
index 0000000..4626fbd
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/LogicalOutput.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * An @link {@link Output} which handles all outgoing physical connections on an
+ * edge. A {@link LogicalIOProcessor} sees a single Logical Output per outgoing
+ * edge.
+ */
+public interface LogicalOutput extends Output {
+  /**
+   * Sets the number of physical ouputs that this <code>LogicalOutput</code>
+   * will receive. This will be called by the Tez framework before initializing
+   * the <code>LogicalOutput</code>
+   * 
+   * @param numOutputs
+   *          the number of physical outputs
+   */
+  public void setNumPhysicalOutputs(int numOutputs);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Output.java b/tez-api/src/main/java/org/apache/tez/engine/api/Output.java
new file mode 100644
index 0000000..ec679ed
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Output.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.util.List;
+
+/**
+ * Represents an Output through which a TezProcessor writes information on an
+ * edge. </p>
+ *
+ * <code>Output</code> implementations must have a 0 argument public constructor
+ * for Tez to construct the <code>Output</code>. Tez will take care of
+ * initializing and closing the Input after a {@link Processor} completes. </p>
+ */
+public interface Output {
+
+  /**
+   * Initializes the <code>Output</code>
+   *
+   * @param outputContext
+   *          the {@link TezOutputContext}
+   * @return
+   * @throws Exception
+   *           if an error occurs
+   */
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws Exception;
+
+  /**
+   * Gets an instance of the {@link Writer} in an <code>Output</code>
+   *
+   * @return
+   * @throws Exception
+   *           if an error occurs
+   */
+  public Writer getWriter() throws Exception;
+
+  /**
+   * Handles user and system generated {@link Events}s, which typically carry
+   * information such as a downstream vertex being ready to consume input.
+   *
+   * @param outputEvents
+   *          the list of {@link Event}s
+   */
+  public void handleEvents(List<Event> outputEvents);
+
+  /**
+   * Closes the <code>Output</code>
+   *
+   * @return
+   * @throws Exception
+   *           if an error occurs
+   */
+  public List<Event> close() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Processor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Processor.java b/tez-api/src/main/java/org/apache/tez/engine/api/Processor.java
new file mode 100644
index 0000000..05e6b84
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Processor.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link Processor} represents the <em>Tez</em> entity responsible for
+ * consuming {@link Input} and producing {@link Output}.
+ */
+public interface Processor {
+
+  /**
+   * Initializes the <code>Processor</code>
+   *
+   * @param processorContext
+   * @throws IOException
+   *           if an error occurs
+   */
+  public void initialize(TezProcessorContext processorContext)
+      throws Exception;
+
+  /**
+   * Handles user and system generated {@link Events}s.
+   *
+   * @param processorEvents
+   *          the list of {@link Event}s
+   */
+  public void handleEvents(List<Event> processorEvents);
+
+  /**
+   * Closes the <code>Processor</code>
+   *
+   * @throws IOException
+   *           if an error occurs
+   */
+  public void close() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Reader.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Reader.java b/tez-api/src/main/java/org/apache/tez/engine/api/Reader.java
new file mode 100644
index 0000000..502c5f2
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Reader.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * A <code>Reader</code> represents the data being read in an {@link Input}
+ */
+public interface Reader {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/TezInputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/TezInputContext.java b/tez-api/src/main/java/org/apache/tez/engine/api/TezInputContext.java
new file mode 100644
index 0000000..ddf1ff8
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/TezInputContext.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * Context handle for the Input to initialize itself.
+ */
+public interface TezInputContext extends TezTaskContext {
+
+  /**
+   * Get the Vertex Name of the Source that generated data for this Input
+   * @return Name of the Source Vertex
+   */
+  public String getSourceVertexName();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/TezOutputContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/TezOutputContext.java b/tez-api/src/main/java/org/apache/tez/engine/api/TezOutputContext.java
new file mode 100644
index 0000000..791a0f0
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/TezOutputContext.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * Context handle for the Output to initialize itself.
+ */
+public interface TezOutputContext extends TezTaskContext {
+
+  /**
+   * Get the Vertex Name of the Destination that is the recipient of this
+   * Output's data
+   * @return Name of the Destination Vertex
+   */
+  public String getDestinationVertexName();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/TezProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/TezProcessorContext.java b/tez-api/src/main/java/org/apache/tez/engine/api/TezProcessorContext.java
new file mode 100644
index 0000000..2bbbe81
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/TezProcessorContext.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+
+/**
+ * Context handle for the Processor to initialize itself.
+ */
+public interface TezProcessorContext extends TezTaskContext {
+
+  /**
+   * Set the overall progress of this Task Attempt
+   * @param progress Progress in the range from [0.0 - 1.0f]
+   */
+  public void setProgress(float progress);
+
+  /**
+   * Check whether this attempt can commit its output
+   * @return true if commit allowed
+   * @throws IOException
+   */
+  public boolean canCommit() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/TezTaskContext.java b/tez-api/src/main/java/org/apache/tez/engine/api/TezTaskContext.java
new file mode 100644
index 0000000..706e646
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/TezTaskContext.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+
+/**
+ * Base interface for Context classes used to initialize the Input, Output
+ * and Processor instances.
+ */
+public interface TezTaskContext {
+
+  // TODO NEWTEZ
+  // Scale the maximum events we fetch per RPC call to mitigate OOM issues
+  // on the ApplicationMaster when a thundering herd of reducers fetch events
+  // This should not be necessary after HADOOP-8942
+
+  /**
+   * Get the {@link ApplicationId} for the running app
+   * @return the {@link ApplicationId}
+   */
+  public ApplicationId getApplicationId();
+
+  /**
+   * Get the current DAG Attempt Number
+   * @return DAG Attempt Number
+   */
+  public int getDAGAttemptNumber();
+
+  /**
+   * Get the index of this Task
+   * @return Task Index
+   */
+  public int getTaskIndex();
+
+  /**
+   * Get the current Task Attempt Number
+   * @return Task Attempt Number
+   */
+  public int getTaskAttemptNumber();
+
+  /**
+   * Get the name of the DAG
+   * @return the DAG name
+   */
+  public String getDAGName();
+
+  /**
+   * Get the name of the Vertex in which the task is running
+   * @return Vertex Name
+   */
+  public String getTaskVertexName();
+
+  public TezCounters getCounters();
+
+  /**
+   * Send Events to the AM and/or dependent Vertices
+   * @param events Events to be sent
+   */
+  public void sendEvents(List<Event> events);
+
+  /**
+   * Get the User Payload for the Input/Output/Processor
+   * @return User Payload
+   */
+  public byte[] getUserPayload();
+
+  /**
+   * Get the work diectories for the Input/Output/Processor
+   * @return an array of work dirs
+   */
+  public String[] getWorkDirs();
+
+  /**
+   * Returns an identifier which is unique to the specific Input, Processor or
+   * Output
+   *
+   * @return
+   */
+  public String getUniqueIdentifier();
+
+  /**
+   * Report a fatal error to the framework. This will cause the entire task to
+   * fail and should not be used for reporting temporary or recoverable errors
+   *
+   * @param exception an exception representing the error
+   */
+  public void fatalError(Throwable exception, String message);
+
+  /**
+   * Returns meta-data for the specified service. As an example, when the MR
+   * ShuffleHandler is used - this would return the jobToken serialized as bytes
+   *
+   * @param serviceName
+   *          the name of the service for which meta-data is required
+   * @return a ByteBuffer representing the meta-data
+   */
+  public ByteBuffer getServiceConsumerMetaData(String serviceName);
+
+  /**
+   * Return Provider meta-data for the specified service As an example, when the
+   * MR ShuffleHandler is used - this would return the shuffle port serialized
+   * as bytes
+   *
+   * @param serviceName
+   *          the name of the service for which provider meta-data is required
+   * @return a ByteBuffer representing the meta-data
+   */
+  public ByteBuffer getServiceProviderMetaData(String serviceName);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/Writer.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/Writer.java b/tez-api/src/main/java/org/apache/tez/engine/api/Writer.java
new file mode 100644
index 0000000..c9503a3
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/Writer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+/**
+ * A <code>Writer</code> represents the data being written by an {@link Output}
+ */
+public interface Writer {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/events/DataMovementEvent.java b/tez-api/src/main/java/org/apache/tez/engine/api/events/DataMovementEvent.java
new file mode 100644
index 0000000..3f35555
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/events/DataMovementEvent.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.api.Event;
+
+/**
+ * Event used by user code to send information between tasks. An output can
+ * generate an Event of this type to sending information regarding output data
+ * ( such as URI for file-based output data, port info in case of
+ * streaming-based data transfers ) to the Input on the destination vertex.
+ */
+public final class DataMovementEvent extends Event {
+
+  /**
+   * Index(i) of the i-th (physical) Input or Output that generated an Event.
+   * For a Processor-generated event, this is ignored.
+   */
+  private final int sourceIndex;
+
+  /**
+   * Index(i) of the i-th (physical) Input or Output that is meant to receive
+   * this Event. For a Processor event, this is ignored.
+   */
+  private int targetIndex;
+
+  /**
+   * User Payload for this Event
+   */
+  private final byte[] userPayload;
+
+  /**
+   * Version number to indicate what attempt generated this Event
+   */
+  private int version;
+
+  /**
+   * User Event constructor
+   * @param sourceIndex Index to identify the physical edge of the input/output
+   * that generated the event
+   * @param userPayload User Payload of the User Event
+   */
+  public DataMovementEvent(int sourceIndex,
+      byte[] userPayload) {
+    this.userPayload = userPayload;
+    this.sourceIndex = sourceIndex;
+  }
+
+  @Private
+  public DataMovementEvent(int sourceIndex,
+      int targetIndex,
+      byte[] userPayload) {
+    this.userPayload = userPayload;
+    this.sourceIndex = sourceIndex;
+    this.targetIndex = targetIndex;
+  }
+
+  /**
+   * Constructor for Processor-generated User Events
+   * @param userPayload
+   */
+  public DataMovementEvent(byte[] userPayload) {
+    this(-1, userPayload);
+  }
+
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  public int getSourceIndex() {
+    return sourceIndex;
+  }
+
+  public int getTargetIndex() {
+    return targetIndex;
+  }
+
+  @Private
+  public void setTargetIndex(int targetIndex) {
+    this.targetIndex = targetIndex;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+  @Private
+  public void setVersion(int version) {
+    this.version = version;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/events/InputFailedEvent.java b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputFailedEvent.java
new file mode 100644
index 0000000..57de09b
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputFailedEvent.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.api.Event;
+
+/**
+ * Event sent from the AM to an Input to indicate that one of it's sources has
+ * failed - effectively the input is no longer available from the particular
+ * source.
+ * Users are not expected to send this event.
+ */
+public class InputFailedEvent extends Event{
+
+  /**
+   * Index(i) of the i-th (physical) Input or Output that generated the data.
+   * For a Processor-generated event, this is ignored.
+   */
+  private final int sourceIndex;
+
+  /**
+   * Index(i) of the i-th (physical) Input or Output that is meant to receive
+   * this Event. For a Processor event, this is ignored.
+   */
+  private int targetIndex;
+
+  /**
+   * Version number to indicate what attempt generated this Event
+   */
+  private int version;
+
+  /**
+   * User Event constructor
+   * @param sourceIndex Index to identify the physical edge of the input/output
+   * that generated the event
+   * @param userPayload User Payload of the User Event
+   */
+  public InputFailedEvent(int sourceIndex) {
+    this.sourceIndex = sourceIndex;
+  }
+
+  @Private
+  public InputFailedEvent(int sourceIndex,
+      int targetIndex,
+      int version) {
+    this.sourceIndex = sourceIndex;
+    this.targetIndex = targetIndex;
+    this.version = version;
+  }
+
+  public int getSourceIndex() {
+    return sourceIndex;
+  }
+
+  public int getTargetIndex() {
+    return targetIndex;
+  }
+
+  @Private
+  public void setTargetIndex(int targetIndex) {
+    this.targetIndex = targetIndex;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+  @Private
+  public void setVersion(int version) {
+    this.version = version;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/events/InputInformationEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/events/InputInformationEvent.java b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputInformationEvent.java
new file mode 100644
index 0000000..3656d45
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputInformationEvent.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.tez.engine.api.Event;
+
+/**
+ * Event used to send user specific data from the user 
+ * code in the AM to the task input
+ */
+public class InputInformationEvent extends Event {
+
+  /**
+   * User Payload for this Event
+   */
+  private final byte[] userPayload;
+  public InputInformationEvent(byte[] userPayload) {
+    this.userPayload = userPayload;
+  }
+
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/api/events/InputReadErrorEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/api/events/InputReadErrorEvent.java b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputReadErrorEvent.java
new file mode 100644
index 0000000..fa49b79
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/api/events/InputReadErrorEvent.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.tez.engine.api.Event;
+
+/**
+ * Event generated by an Input to indicate error when trying to retrieve data.
+ * This is not necessarily a fatal event - it's an indication to the AM to retry
+ * source data generation.
+ */
+public final class InputReadErrorEvent extends Event {
+
+  /**
+   * Diagnostics/trace of the error that occurred on the Input's edge.
+   */
+  private final String diagnostics;
+
+  /**
+   * Index of the physical edge on which the error occurred.
+   */
+  private final int index;
+
+  /**
+   * Version of the data on which the error occurred.
+   */
+  private final int version;
+
+  public InputReadErrorEvent(String diagnostics, int index,
+      int version) {
+    super();
+    this.diagnostics = diagnostics;
+    this.index = index;
+    this.version = version;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public int getIndex() {
+    return index;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java
new file mode 100644
index 0000000..7099299
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectLifeCycle.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+/**
+ * Defines the valid lifecycle and scope of Objects stored in ObjectRegistry.
+ * Objects are guaranteed to not be valid outside of their defined life-cycle
+ * period. Objects are not guaranteed to be retained through the defined period
+ * as they may be evicted for various reasons.
+ */
+public enum ObjectLifeCycle {
+  /** Objects are valid for the lifetime of the Tez JVM/Session
+   */
+  SESSION,
+  /** Objects are valid for the lifetime of the DAG.
+   */
+  DAG,
+  /** Objects are valid for the lifetime of the Vertex.
+   */
+  VERTEX,
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java
new file mode 100644
index 0000000..a27903d
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistry.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+/**
+ * Preliminary version of a simple shared object cache to re-use
+ * objects across multiple tasks within the same container/JVM.
+ */
+public interface ObjectRegistry {
+
+  /**
+   * Insert or update object into the registry. This will remove an object
+   * associated with the same key with a different life-cycle as there is only
+   * one instance of an Object stored for a given key irrespective of the
+   * life-cycle attached to the Object.
+   * @param lifeCycle What life-cycle is the Object valid for
+   * @param key Key to identify the Object
+   * @param value Object to be inserted
+   * @return Previous Object associated with the key attached if present
+   * else null. Could return the same object if the object was associated with
+   * the same key for a different life-cycle.
+   */
+  public Object add(ObjectLifeCycle lifeCycle, String key, Object value);
+
+  /**
+   * Return the object associated with the provided key
+   * @param key Key to find object
+   * @return Object if found else null
+   */
+  public Object get(String key);
+
+  /**
+   * Delete the object associated with the provided key
+   * @param lifeCycle What life-cycle is the Object valid for
+   * @param key Key to find object
+   * @return True if an object was found and removed
+   */
+  public boolean delete(String key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java
new file mode 100644
index 0000000..94352b3
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/engine/common/objectregistry/ObjectRegistryFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.objectregistry;
+
+import com.google.inject.Inject;
+
+public class ObjectRegistryFactory {
+
+  @Inject
+  private static ObjectRegistry objectRegistry;
+
+  public static ObjectRegistry getObjectRegistry() {
+    return objectRegistry;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto
new file mode 100644
index 0000000..4385749
--- /dev/null
+++ b/tez-api/src/main/proto/DAGApiRecords.proto
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.dag.api.records";
+option java_outer_classname = "DAGProtos";
+option java_generate_equals_and_hash = true;
+
+// DAG plan messages
+
+// Many of these types have a dual in the Tez-api.  To reduce confusion, these types have prefix or suffix 
+// of "Plan" to indicate they are to be used in the dag-plan.
+// The big types use a suffix:  JobPlan, VertexPlan, EdgePlan 
+//   --> these get more direct use in the runtime and the naming is natural.
+// The enums and utility types use prefix: PlanVertexType, PlanEdgeConnectionPaatern, etc
+//   --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable. 
+
+enum PlanVertexType {
+  INPUT = 0;
+  NORMAL = 1;
+  OUTPUT = 2;
+}
+
+enum PlanEdgeDataMovementType {
+  ONE_TO_ONE = 0;
+  BROADCAST = 1;
+  SCATTER_GATHER = 2;
+}
+
+enum PlanEdgeDataSourceType {
+  PERSISTED = 0;
+  PERSISTED_RELIABLE = 1;
+  EPHEMERAL = 2;
+}
+
+enum PlanEdgeSchedulingType {
+  SEQUENTIAL = 0;
+  CONCURRENT = 1;
+}
+
+message PlanKeyValuePair {
+  required string key = 1;
+  required string value = 2;
+}
+
+enum PlanLocalResourceType {
+  FILE = 0;
+  ARCHIVE = 1;
+  PATTERN = 2;
+}
+
+enum PlanLocalResourceVisibility {
+  PUBLIC = 0;
+  PRIVATE = 1;
+  APPLICATION = 2;
+}
+
+message PlanLocalResource {
+  required string name = 1;
+  required string uri = 2;
+  required int64 size = 3;
+  required int64 timeStamp = 4;
+  required PlanLocalResourceType type = 5;
+  required PlanLocalResourceVisibility visibility = 6;
+  optional string pattern = 7; // only used if type=PATTERN
+}
+
+// Each taskLocationHint represents a single split in in the input.
+// It is the list of [{rack,machines}] that host a replica of each particular split.
+// For now it is represented as pair-of-arrays rather than array-of-pairs.
+message PlanTaskLocationHint {
+  repeated string rack = 1;
+  repeated string host = 2;
+}
+
+message PlanTaskConfiguration {
+  required int32 numTasks = 1;
+  required int32 memoryMb = 2;
+  required int32 virtualCores = 3;
+  required string javaOpts = 4;
+  required string taskModule = 5;
+  repeated PlanLocalResource localResource = 6;
+  repeated PlanKeyValuePair environmentSetting = 8;  
+}
+
+message TezEntityDescriptorProto {
+  optional string class_name = 1;
+  optional bytes user_payload = 2;
+}
+
+message VertexPlan {
+  required string name = 1;
+  required PlanVertexType type = 2;
+  optional TezEntityDescriptorProto processor_descriptor = 3;
+  required PlanTaskConfiguration taskConfig = 4;
+  repeated PlanTaskLocationHint taskLocationHint = 7;
+  repeated string inEdgeId = 8;
+  repeated string outEdgeId = 9;
+}
+
+message EdgePlan {
+  required string id = 1;
+  required string inputVertexName = 2;
+  required string outputVertexName = 3;
+  required PlanEdgeDataMovementType dataMovementType = 4;
+  required PlanEdgeDataSourceType dataSourceType = 5;
+  required PlanEdgeSchedulingType schedulingType = 6;
+  optional TezEntityDescriptorProto edge_source = 7;
+  optional TezEntityDescriptorProto edge_destination = 8;
+}
+
+message ConfigurationProto {
+  repeated PlanKeyValuePair confKeyValues = 1;
+}
+
+message DAGPlan {
+  required string name = 1;
+  repeated VertexPlan vertex = 2;
+  repeated EdgePlan edge = 3;
+  optional ConfigurationProto dagKeyValues = 4;
+}
+
+// DAG monitoring messages
+message ProgressProto {
+  optional int32 totalTaskCount = 1;
+  optional int32 succeededTaskCount = 2;
+  optional int32 runningTaskCount = 3;
+  optional int32 failedTaskCount = 4;
+  optional int32 killedTaskCount = 5;
+}
+
+enum VertexStatusStateProto {
+  VERTEX_INITED = 0;
+  VERTEX_RUNNING = 1;
+  VERTEX_SUCCEEDED = 2;
+  VERTEX_KILLED = 3;
+  VERTEX_FAILED = 4;
+  VERTEX_ERROR = 5;
+  VERTEX_TERMINATING = 6;
+}
+
+message VertexStatusProto {
+  optional VertexStatusStateProto state = 1;
+  repeated string diagnostics = 2;
+  optional ProgressProto progress = 3;
+}
+
+enum DAGStatusStateProto {
+  DAG_SUBMITTED = 0;
+  DAG_INITING = 1;
+  DAG_RUNNING = 2;
+  DAG_SUCCEEDED = 3;
+  DAG_KILLED = 4;
+  DAG_FAILED = 5;
+  DAG_ERROR = 6;
+  DAG_TERMINATING = 7;
+}
+
+message StringProgressPairProto {
+  required string key = 1;
+  required ProgressProto progress = 2;
+}
+
+message DAGStatusProto {
+  optional DAGStatusStateProto state = 1;
+  repeated string diagnostics = 2;
+  optional ProgressProto DAGProgress = 3;
+  repeated StringProgressPairProto vertexProgress = 4;  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto
new file mode 100644
index 0000000..6fcd1f8
--- /dev/null
+++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.dag.api.client.rpc";
+option java_outer_classname = "DAGClientAMProtocolRPC";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+//import "DAGClientAMProtocolRecords.proto";
+
+import "DAGApiRecords.proto";
+
+message GetAllDAGsRequestProto {
+}
+
+message GetAllDAGsResponseProto {
+  repeated string dagId = 1;
+}
+
+message GetDAGStatusRequestProto {
+  optional string dagId = 1;
+}
+
+message GetDAGStatusResponseProto {
+  optional DAGStatusProto dagStatus = 1;
+}
+
+message GetVertexStatusRequestProto {
+  optional string dagId = 1;
+  optional string vertexName = 2;
+}
+
+message GetVertexStatusResponseProto {
+  optional VertexStatusProto vertexStatus = 1;
+}
+
+message TryKillDAGRequestProto {
+  optional string dagId = 1;
+}
+
+message TryKillDAGResponseProto {
+  //nothing yet
+}
+
+message SubmitDAGRequestProto {
+  optional DAGPlan d_a_g_plan = 1;
+}
+
+message SubmitDAGResponseProto {
+  optional string dagId = 1;
+}
+
+message ShutdownSessionRequestProto {
+}
+
+message ShutdownSessionResponseProto {
+}
+
+service DAGClientAMProtocol {
+  rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
+  rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
+  rpc getVertexStatus (GetVertexStatusRequestProto) returns (GetVertexStatusResponseProto);
+  rpc tryKillDAG (TryKillDAGRequestProto) returns (TryKillDAGResponseProto);
+  rpc submitDAG (SubmitDAGRequestProto) returns (SubmitDAGResponseProto);
+  rpc shutdownSession (ShutdownSessionRequestProto) returns (ShutdownSessionResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-api/src/main/proto/Events.proto b/tez-api/src/main/proto/Events.proto
new file mode 100644
index 0000000..21cacf6
--- /dev/null
+++ b/tez-api/src/main/proto/Events.proto
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.engine.api.events";
+option java_outer_classname = "EventProtos";
+option java_generate_equals_and_hash = true;
+
+message DataMovementEventProto {
+  optional int32 source_index = 1;
+  optional int32 target_index = 2;
+  optional bytes user_payload = 3;
+  optional int32 version = 4;
+}
+
+message InputReadErrorEventProto {
+  optional int32 index = 1;
+  optional string diagnostics = 2;
+  optional int32 version = 3;
+}
+
+message InputFailedEventProto {
+  optional int32 source_index = 1;
+  optional int32 target_index = 2;
+  optional int32 version = 4;
+}
+
+message InputInformationEventProto {
+  optional bytes user_payload = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
new file mode 100644
index 0000000..53ec357
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
+import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
+import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+// based on TestDAGLocationHint
+public class TestDAGPlan {
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder(); //TODO: doesn't seem to be deleting this folder automatically as expected.
+
+  @Test
+  public void testBasicJobPlanSerde() throws IOException {
+
+    DAGPlan job = DAGPlan.newBuilder()
+       .setName("test")
+       .addVertex(
+           VertexPlan.newBuilder()
+             .setName("vertex1")
+             .setType(PlanVertexType.NORMAL)
+             .addTaskLocationHint(PlanTaskLocationHint.newBuilder().addHost("machineName").addRack("rack1").build())
+             .setTaskConfig(
+                 PlanTaskConfiguration.newBuilder()
+                   .setNumTasks(2)
+                   .setVirtualCores(4)
+                   .setMemoryMb(1024)
+                   .setJavaOpts("")
+                   .setTaskModule("x.y")
+                   .build())
+             .build())
+        .build();
+   File file = tempFolder.newFile("jobPlan");
+   FileOutputStream outStream = null;
+   try {
+     outStream = new FileOutputStream(file);
+     job.writeTo(outStream);
+   }
+   finally {
+     if(outStream != null){
+       outStream.close();
+     }
+   }
+
+   DAGPlan inJob;
+   FileInputStream inputStream;
+   try {
+     inputStream = new FileInputStream(file);
+     inJob = DAGPlan.newBuilder().mergeFrom(inputStream).build();
+   }
+   finally {
+     outStream.close();
+   }
+
+   Assert.assertEquals(job, inJob);
+  }
+
+  @Test
+  public void testUserPayloadSerde() {
+    DAG dag = new DAG("testDag");
+    ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1").
+        setUserPayload("processor1Bytes".getBytes());
+    ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2").
+        setUserPayload("processor2Bytes".getBytes());
+    Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
+    Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
+    v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalResources(new HashMap<String, LocalResource>());
+    v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
+        .setTaskLocalResources(new HashMap<String, LocalResource>());
+
+    InputDescriptor inputDescriptor = new InputDescriptor("input").
+        setUserPayload("inputBytes".getBytes());
+    OutputDescriptor outputDescriptor = new OutputDescriptor("output").
+        setUserPayload("outputBytes".getBytes());
+    Edge edge = new Edge(v1, v2, new EdgeProperty(
+        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, outputDescriptor, inputDescriptor));
+
+    dag.addVertex(v1).addVertex(v2).addEdge(edge);
+
+    DAGPlan dagProto = dag.createDag(new TezConfiguration());
+
+    assertEquals(2, dagProto.getVertexCount());
+    assertEquals(1, dagProto.getEdgeCount());
+
+    VertexPlan v1Proto = dagProto.getVertex(0);
+    VertexPlan v2Proto = dagProto.getVertex(1);
+    EdgePlan edgeProto = dagProto.getEdge(0);
+
+    assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor()
+        .getUserPayload().toByteArray()));
+    assertEquals("processor1", v1Proto.getProcessorDescriptor().getClassName());
+
+    assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor()
+        .getUserPayload().toByteArray()));
+    assertEquals("processor2", v2Proto.getProcessorDescriptor().getClassName());
+
+    assertEquals("inputBytes", new String(edgeProto.getEdgeDestination()
+        .getUserPayload().toByteArray()));
+    assertEquals("input", edgeProto.getEdgeDestination().getClassName());
+
+    assertEquals("outputBytes", new String(edgeProto.getEdgeSource()
+        .getUserPayload().toByteArray()));
+    assertEquals("output", edgeProto.getEdgeSource().getClassName());
+
+    EdgeProperty edgeProperty = DagTypeConverters
+        .createEdgePropertyMapFromDAGPlan(dagProto.getEdgeList().get(0));
+
+    byte[] ib = edgeProperty.getEdgeDestination().getUserPayload();
+    assertEquals("inputBytes", new String(ib));
+    assertEquals("input", edgeProperty.getEdgeDestination().getClassName());
+
+    byte[] ob = edgeProperty.getEdgeSource().getUserPayload();
+    assertEquals("outputBytes", new String(ob));
+    assertEquals("output", edgeProperty.getEdgeSource().getClassName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
new file mode 100644
index 0000000..b33f3a6
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDAGVerify {
+
+  private final String dummyProcessorClassName = TestDAGVerify.class.getName();
+  private final String dummyInputClassName = TestDAGVerify.class.getName();
+  private final String dummyOutputClassName = TestDAGVerify.class.getName();
+  private final int dummyTaskCount = 2;
+  private final Resource dummyTaskResource = Resource.newInstance(1, 1);
+
+  //    v1
+  //    |
+  //    v2
+  @Test
+  public void testVerify1() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addEdge(e1);
+    dag.verify();
+  }
+
+  @Test(expected = IllegalStateException.class)  
+  public void testVerify2() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addEdge(e1);
+    dag.verify();
+  }
+
+  @Test(expected = IllegalStateException.class)  
+  public void testVerify3() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.EPHEMERAL, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addEdge(e1);
+    dag.verify();
+  }
+
+  @Test(expected = IllegalStateException.class)  
+  public void testVerify4() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor(dummyProcessorClassName),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.EPHEMERAL, SchedulingType.CONCURRENT, 
+            new OutputDescriptor(dummyOutputClassName),
+            new InputDescriptor(dummyInputClassName)));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addEdge(e1);
+    dag.verify();
+  }
+
+  //    v1 <----
+  //      |     ^
+  //       v2   ^
+  //      |  |  ^
+  //    v3    v4
+  @Test
+  public void testCycle1() {
+    IllegalStateException ex=null;
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v4 = new Vertex("v4",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
+    Edge e2 = new Edge(v2, v3,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
+    Edge e3 = new Edge(v2, v4,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
+    Edge e4 = new Edge(v4, v1,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addVertex(v4);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    dag.addEdge(e3);
+    dag.addEdge(e4);
+    try{
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
+  }
+
+  //     v1
+  //      |
+  //    -> v2
+  //    ^  | |
+  //    v3    v4
+  @Test
+  public void testCycle2() {
+    IllegalStateException ex=null;
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v4 = new Vertex("v4",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v2,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
+    Edge e2 = new Edge(v2, v3,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
+    Edge e3 = new Edge(v2, v4,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
+    Edge e4 = new Edge(v3, v2,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addVertex(v4);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    dag.addEdge(e3);
+    dag.addEdge(e4);
+    try{
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
+  }
+
+  @Test
+  public void repeatedVertexName() {
+    IllegalStateException ex=null;
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v1repeat = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v1repeat);
+    try {
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage().startsWith("DAG contains multiple vertices with name"));
+  }
+
+  //  v1  v2
+  //   |  |
+  //    v3
+  @Test
+  public void BinaryInputDisallowed() {
+    IllegalStateException ex=null;
+    try {
+      Vertex v1 = new Vertex("v1",
+          new ProcessorDescriptor("MapProcessor"),
+          dummyTaskCount, dummyTaskResource);
+      Vertex v2 = new Vertex("v2",
+          new ProcessorDescriptor("MapProcessor"),
+          dummyTaskCount, dummyTaskResource);
+      Vertex v3 = new Vertex("v3",
+          new ProcessorDescriptor("ReduceProcessor"),
+          dummyTaskCount, dummyTaskResource);
+      Edge e1 = new Edge(v1, v3,
+          new EdgeProperty(DataMovementType.ONE_TO_ONE, 
+              DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+              new OutputDescriptor("dummy output class"),
+              new InputDescriptor("dummy input class")));
+      Edge e2 = new Edge(v2, v3,
+          new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+              DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+              new OutputDescriptor("dummy output class"),
+              new InputDescriptor("dummy input class")));
+      DAG dag = new DAG("testDag");
+      dag.addVertex(v1);
+      dag.addVertex(v2);
+      dag.addVertex(v3);
+      dag.addEdge(e1);
+      dag.addEdge(e2);
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage().startsWith(
+        "Unsupported connection pattern on edge"));
+  }
+
+  //  v1  v2
+  //   |  |
+  //    v3
+  @Test
+  public void BinaryInputAllowed() {
+    Vertex v1 = new Vertex("v1",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v2 = new Vertex("v2",
+        new ProcessorDescriptor("MapProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Vertex v3 = new Vertex("v3",
+        new ProcessorDescriptor("ReduceProcessor"),
+        dummyTaskCount, dummyTaskResource);
+    Edge e1 = new Edge(v1, v3,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
+    Edge e2 = new Edge(v2, v3,
+        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+            new OutputDescriptor("dummy output class"),
+            new InputDescriptor("dummy input class")));
+    DAG dag = new DAG("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    dag.addEdge(e1);
+    dag.addEdge(e2);
+    dag.verify();
+  }
+
+  //   v1
+  //  |  |
+  //  v2  v3
+  @Test
+  public void BinaryOutput() {
+    IllegalStateException ex=null;
+    try {
+      Vertex v1 = new Vertex("v1",
+          new ProcessorDescriptor("MapProcessor"),
+          dummyTaskCount, dummyTaskResource);
+      Vertex v2 = new Vertex("v2",
+          new ProcessorDescriptor("MapProcessor"),
+          dummyTaskCount, dummyTaskResource);
+      Vertex v3 = new Vertex("v3",
+          new ProcessorDescriptor("MapProcessor"),
+          dummyTaskCount, dummyTaskResource);
+      Edge e1 = new Edge(v1, v2,
+          new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+              DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+              new OutputDescriptor("dummy output class"),
+              new InputDescriptor("dummy input class")));
+      Edge e2 = new Edge(v1, v2,
+          new EdgeProperty(DataMovementType.SCATTER_GATHER, 
+              DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
+              new OutputDescriptor("dummy output class"),
+              new InputDescriptor("dummy input class")));
+      DAG dag = new DAG("testDag");
+      dag.addVertex(v1);
+      dag.addVertex(v2);
+      dag.addVertex(v3);
+      dag.addEdge(e1);
+      dag.addEdge(e2);
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage().startsWith("Vertex has outDegree>1"));
+  }
+
+  @Test
+  public void testDagWithNoVertices() {
+    IllegalStateException ex=null;
+    try {
+      DAG dag = new DAG("testDag");
+      dag.verify();
+    }
+    catch (IllegalStateException e){
+      ex = e;
+    }
+    Assert.assertNotNull(ex);
+    System.out.println(ex.getMessage());
+    Assert.assertTrue(ex.getMessage()
+        .startsWith("Invalid dag containing 0 vertices"));
+  }
+
+  @SuppressWarnings("unused")
+  @Test
+  public void testInvalidVertexConstruction() {
+    try {
+      Vertex v1 = new Vertex("v1",
+          new ProcessorDescriptor("MapProcessor"),
+          0, dummyTaskResource);
+      Assert.fail("Expected exception for 0 parallelism");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Parallelism cannot be 0"));
+    }
+    try {
+      Vertex v1 = new Vertex("v1",
+          new ProcessorDescriptor("MapProcessor"),
+          1, null);
+      Assert.fail("Expected exception for 0 parallelism");
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().startsWith("Resource cannot be null"));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-common/pom.xml
----------------------------------------------------------------------
diff --git a/tez-common/pom.xml b/tez-common/pom.xml
index 87f18d6..b4882bb 100644
--- a/tez-common/pom.xml
+++ b/tez-common/pom.xml
@@ -39,7 +39,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tez</groupId>
-      <artifactId>tez-dag-api</artifactId>
+      <artifactId>tez-api</artifactId>
     </dependency>
   </dependencies>
 


Mime
View raw message