tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject svn commit: r1469642 [20/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...
Date Thu, 18 Apr 2013 23:54:28 GMT
Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,90 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+
+
+
+public interface TaskRuntimeEstimator {
+  public void enrollAttempt(TaskAttemptStatus reportedStatus, long timestamp);
+
+  public long attemptEnrolledTime(TezTaskAttemptID attemptID);
+
+  public void updateAttempt(TaskAttemptStatus reportedStatus, long timestamp);
+
+  public void contextualize(Configuration conf, AppContext context);
+
+  /**
+   *
+   * Find a maximum reasonable execution wallclock time.  Includes the time
+   * already elapsed.
+   *
+   * Find a maximum reasonable execution time.  Includes the time
+   * already elapsed.  If the projected total execution time for this task
+   * ever exceeds its reasonable execution time, we may speculate it.
+   *
+   * @param id the {@link TezTaskID} of the task we are asking about
+   * @return the task's maximum reasonable runtime, or MAX_VALUE if
+   *         we don't have enough information to rule out any runtime,
+   *         however long.
+   *
+   */
+  public long thresholdRuntime(TezTaskID id);
+
+  /**
+   *
+   * Estimate a task attempt's total runtime.  Includes the time already
+   * elapsed.
+   *
+   * @param id the {@link TezTaskAttemptID} of the attempt we are asking about
+   * @return our best estimate of the attempt's runtime, or {@code -1} if
+   *         we don't have enough information yet to produce an estimate.
+   *
+   */
+  public long estimatedRuntime(TezTaskAttemptID id);
+
+  /**
+   *
+   * Estimates how long a new attempt on this task will take if we start
+   *  one now
+   *
+   * @param id the {@link TezTaskID} of the task we are asking about
+   * @return our best estimate of a new attempt's runtime, or {@code -1} if
+   *         we don't have enough information yet to produce an estimate.
+   *
+   */
+  public long estimatedNewAttemptRuntime(TezTaskID id);
+
+  /**
+   *
+   * Computes the width of the error band of our estimate of the task
+   *  runtime as returned by {@link #estimatedRuntime(TezTaskAttemptID)}
+   *
+   * @param id the {@link TezTaskAttemptID} of the attempt we are asking about
+   * @return our best estimate of the attempt's runtime, or {@code -1} if
+   *         we don't have enough information yet to produce an estimate.
+   *
+   */
+  public long runtimeEstimateVariance(TezTaskAttemptID id);
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskRuntimeEstimator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.speculate;
+
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.engine.records.TezTaskID;
+
+
+public class TaskSpeculationPredicate {
+  boolean canSpeculate(AppContext context, TezTaskID taskID) {
+    // This class rejects speculating any task that already has speculations,
+    //  or isn't running.
+    //  Subclasses should call TaskSpeculationPredicate.canSpeculate(...) , but
+    //  can be even more restrictive.
+    // TODO handle multiple dags
+    DAG job = context.getDAG();
+    Task task = job.getVertex(taskID.getVertexID()).getTask(taskID);
+    return task.getAttempts().size() == 1;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/TaskSpeculationPredicate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.speculate;
+import org.apache.hadoop.classification.InterfaceAudience;

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/speculate/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskAttemptCleanupEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskAttemptCleanupEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskAttemptCleanupEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskAttemptCleanupEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.taskclean;
+
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * This class encapsulates task cleanup event.
+ * 
+ */
+public class TaskAttemptCleanupEvent extends
+    AbstractEvent<TaskCleaner.EventType> {
+
+  private final TaskAttemptId attemptID;
+  private final OutputCommitter committer;
+  private final TaskAttemptContext attemptContext;
+  private final ContainerId containerId;
+
+  public TaskAttemptCleanupEvent(TaskAttemptId attemptID,
+      ContainerId containerId, OutputCommitter committer,
+      TaskAttemptContext attemptContext) {
+    super(TaskCleaner.EventType.TASK_CLEAN);
+    this.attemptID = attemptID;
+    this.containerId = containerId;
+    this.committer = committer;
+    this.attemptContext = attemptContext;
+  }
+
+  public TaskAttemptId getAttemptID() {
+    return attemptID;
+  }
+
+  public OutputCommitter getCommitter() {
+    return committer;
+  }
+
+  public TaskAttemptContext getAttemptContext() {
+    return attemptContext;
+  }
+
+  /**
+   * containerId could be null if the container task attempt had not started.
+   * @return
+   */
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskAttemptCleanupEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleaner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleaner.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleaner.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleaner.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,30 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.taskclean;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface TaskCleaner extends EventHandler<TaskCleanupEvent> {
+
+  enum EventType {
+    // TODO XXX Rename this event once the code is more stable.
+    TASK_CLEAN,
+    CONTAINER_COMPLETED,
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleaner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerContainerCompletedEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerContainerCompletedEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerContainerCompletedEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerContainerCompletedEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.taskclean;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class TaskCleanerContainerCompletedEvent extends AbstractEvent<TaskCleaner.EventType> {
+
+  private ContainerId containerId;
+  
+  public TaskCleanerContainerCompletedEvent(ContainerId containerId) {
+    super(TaskCleaner.EventType.CONTAINER_COMPLETED);
+    this.containerId = containerId;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerContainerCompletedEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,121 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.taskclean;
+
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.dag.app.AppContext;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TaskCleanerImpl extends AbstractService implements TaskCleaner {
+
+  private static final Log LOG = LogFactory.getLog(TaskCleanerImpl.class);
+
+  private final AppContext context;
+  private ThreadPoolExecutor launcherPool;
+  private Thread eventHandlingThread;
+  private BlockingQueue<TaskCleanupEvent> eventQueue =
+      new LinkedBlockingQueue<TaskCleanupEvent>();
+
+  public TaskCleanerImpl(AppContext context) {
+    super("TaskCleaner");
+    this.context = context;
+  }
+
+  public void start() {
+    ThreadFactory tf = new ThreadFactoryBuilder()
+      .setNameFormat("TaskCleaner #%d")
+      .build();
+    launcherPool = new ThreadPoolExecutor(5, 5, 1, 
+        TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+    eventHandlingThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        TaskCleanupEvent event = null;
+        while (!Thread.currentThread().isInterrupted()) {
+          try {
+            event = eventQueue.take();
+          } catch (InterruptedException e) {
+            LOG.error("Returning, interrupted : " + e);
+            return;
+          }
+          // the events from the queue are handled in parallel
+          // using a thread pool
+          launcherPool.execute(new EventProcessor(event));        }
+      }
+    });
+    eventHandlingThread.setName("TaskCleaner Event Handler");
+    eventHandlingThread.start();
+    super.start();
+  }
+
+  public void stop() {
+    if (eventHandlingThread != null) {
+      eventHandlingThread.interrupt();
+    }
+    if (launcherPool != null) {
+      launcherPool.shutdown();
+    }
+    Iterator<TaskCleanupEvent> it = eventQueue.iterator();
+    while (it.hasNext()) {
+      TaskCleanupEvent ev = it.next();
+      LOG.info("TaskCleaner.stop: Cleanup for: " + ev.getAttemptID());
+      new EventProcessor(ev).run();
+    }
+    super.stop();
+  }
+
+  private class EventProcessor implements Runnable {
+    private TaskCleanupEvent event;
+
+    EventProcessor(TaskCleanupEvent event) {
+      this.event = event;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Processing the event " + event.toString());
+      try {
+        event.getCommitter().abortTask(event.getAttemptContext());
+      } catch (Exception e) {
+        LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e);
+      }
+    }
+  }
+
+  @Override
+  public void handle(TaskCleanupEvent event) {
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new YarnException(e);
+    }
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanupEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanupEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanupEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanupEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.taskclean;
+
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+/**
+ * This class encapsulates task cleanup event.
+ *
+ */
+public class TaskCleanupEvent extends AbstractEvent<TaskCleaner.EventType> {
+  // TODO XXX: Rename to TaskAttemptCleanupEvent ?
+
+  // TODO XXX: Maybe include the containerId along with this event. Otherwise depend on events coming in from the Container to include MRxTaskAttemptIDs.
+  private final TezTaskAttemptID attemptID;
+  private final OutputCommitter committer;
+  private final TaskAttemptContext attemptContext;
+
+  public TaskCleanupEvent(TezTaskAttemptID attemptID, OutputCommitter committer, 
+      TaskAttemptContext attemptContext) {
+    super(TaskCleaner.EventType.TASK_CLEAN);
+    this.attemptID = attemptID;
+    this.committer = committer;
+    this.attemptContext = attemptContext;
+  }
+
+  public TezTaskAttemptID getAttemptID() {
+    return attemptID;
+  }
+
+  public OutputCommitter getCommitter() {
+    return committer;
+  }
+
+  public TaskAttemptContext getAttemptContext() {
+    return attemptContext;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/TaskCleanupEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/package-info.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/package-info.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/package-info.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.tez.dag.app.taskclean;
+import org.apache.hadoop.classification.InterfaceAudience;

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/taskclean/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo Thu Apr 18 23:54:18 2013
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.tez.dag.app.MRClientSecurityInfo

Added: incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDAGLocationHint {
+
+  private DataInput in;
+  private DataOutput out;
+  private ByteArrayOutputStream bOut;
+
+  @Before
+  public void setup() {
+    bOut = new ByteArrayOutputStream();
+    out = new DataOutputStream(bOut);
+  }
+
+  @After
+  public void teardown() {
+    in = null;
+    out = null;
+    bOut = null;
+  }
+
+  @Test
+  public void testNullDAGLocationHintSerDes() throws IOException {
+    DAGLocationHint expected = new DAGLocationHint();
+    expected.write(out);
+    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
+    DAGLocationHint actual = new DAGLocationHint();
+    actual.readFields(in);
+    Assert.assertNotNull(actual.getVertexLocationHints());
+    Assert.assertEquals(0, actual.getVertexLocationHints().size());
+  }
+
+  @Test
+  public void testDAGLocationHintSerDes() throws IOException {
+    String[] hosts = { "h1", "h2", "", null };
+    String[] racks = { "r1", "r2" };
+
+    VertexLocationHint vertexLocationHint = new VertexLocationHint(4);
+    vertexLocationHint.getTaskLocationHints()[0] =
+        new TaskLocationHint(hosts, racks);
+    DAGLocationHint expected = new DAGLocationHint();
+    expected.getVertexLocationHints().put("v1", null);
+    expected.getVertexLocationHints().put("v2", new VertexLocationHint());
+    expected.getVertexLocationHints().put("v3", vertexLocationHint);
+    expected.write(out);
+
+    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
+    DAGLocationHint actual = new DAGLocationHint();
+    actual.readFields(in);
+    Assert.assertNotNull(actual.getVertexLocationHints());
+    Assert.assertEquals(3, actual.getVertexLocationHints().size());
+
+    Assert.assertNull(actual.getVertexLocationHint("v1"));
+    Assert.assertNotNull(actual.getVertexLocationHint("v2"));
+    Assert.assertNotNull(actual.getVertexLocationHint("v3"));
+
+    Assert.assertEquals(0, actual.getVertexLocationHint("v2").getNumTasks());
+    Assert.assertEquals(0,
+        actual.getVertexLocationHint("v2").getTaskLocationHints().length);
+
+    Assert.assertEquals(4, actual.getVertexLocationHint("v3").getNumTasks());
+    Assert.assertEquals(4,
+        actual.getVertexLocationHint("v3").getTaskLocationHints().length);
+    Assert.assertNotNull(
+        actual.getVertexLocationHint("v3").getTaskLocationHints()[0]);
+    Assert.assertArrayEquals(racks,
+        actual.getVertexLocationHint("v3").getTaskLocationHints()[0].
+            getRacks());
+    Assert.assertArrayEquals(hosts,
+        actual.getVertexLocationHint("v3").getTaskLocationHints()[0].
+            getDataLocalHosts());
+    Assert.assertNull(
+        actual.getVertexLocationHint("v3").getTaskLocationHints()[1]);
+    Assert.assertNull(
+        actual.getVertexLocationHint("v3").getTaskLocationHints()[2]);
+    Assert.assertNull(
+        actual.getVertexLocationHint("v3").getTaskLocationHints()[3]);
+  }
+
+
+}
+

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/api/TestDAGLocationHint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestVertexLocationHint {
+
+  private DataInput in;
+  private DataOutput out;
+  private ByteArrayOutputStream bOut;
+
+  @Before
+  public void setup() {
+    bOut = new ByteArrayOutputStream();
+    out = new DataOutputStream(bOut);
+  }
+
+  @After
+  public void teardown() {
+    in = null;
+    out = null;
+    bOut = null;
+  }
+
+  @Test
+  public void testNullTaskLocationHintSerDes() throws IOException {
+    TaskLocationHint expected = new TaskLocationHint(null, null);
+    expected.write(out);
+    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
+    TaskLocationHint actual = new TaskLocationHint();
+    actual.readFields(in);
+    Assert.assertNull(actual.getDataLocalHosts());
+    Assert.assertNull(actual.getRacks());
+  }
+
+  @Test
+  public void testTaskLocationHintSerDes() throws IOException {
+    String[] hosts = { "h1", "h2", "", null };
+    String[] racks = { "r1", "r2" };
+    TaskLocationHint expected = new TaskLocationHint(hosts, racks);
+    expected.write(out);
+    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
+    TaskLocationHint actual = new TaskLocationHint();
+    actual.readFields(in);
+    Assert.assertNotNull(actual.getDataLocalHosts());
+    Assert.assertNotNull(actual.getRacks());
+    Assert.assertArrayEquals(hosts, actual.getDataLocalHosts());
+    Assert.assertArrayEquals(racks, actual.getRacks());
+  }
+
+  @Test
+  public void testTaskLocationHintSerDes2() throws IOException {
+    String[] hosts = null;
+    String[] racks = { "r1", "r2" };
+    TaskLocationHint expected = new TaskLocationHint(hosts, racks);
+    expected.write(out);
+    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
+    TaskLocationHint actual = new TaskLocationHint();
+    actual.readFields(in);
+    Assert.assertNull(actual.getDataLocalHosts());
+    Assert.assertNotNull(actual.getRacks());
+    Assert.assertArrayEquals(racks, actual.getRacks());
+  }
+
+  @Test
+  public void testEmptyVertexLocationHintSerDes() throws IOException {
+    VertexLocationHint expected = new VertexLocationHint(0);
+    expected.write(out);
+    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
+    VertexLocationHint actual = new VertexLocationHint();
+    actual.readFields(in);
+    Assert.assertEquals(0, actual.getNumTasks());
+    Assert.assertNotNull(actual.getTaskLocationHints());
+    Assert.assertEquals(0, actual.getTaskLocationHints().length);
+  }
+
+  @Test
+  public void testVertexLocationHintSerDes() throws IOException {
+    String[] hosts = { "h1", "h2", "", null };
+    String[] racks = { "r1", "r2" };
+    VertexLocationHint expected = new VertexLocationHint(4);
+    expected.getTaskLocationHints()[0] = new TaskLocationHint(hosts, racks);
+    expected.getTaskLocationHints()[1] = null;
+    expected.getTaskLocationHints()[2] = new TaskLocationHint(null, racks);
+    expected.getTaskLocationHints()[3] = new TaskLocationHint(hosts, null);
+    expected.write(out);
+    in = new DataInputStream(new ByteArrayInputStream(bOut.toByteArray()));
+    VertexLocationHint actual = new VertexLocationHint();
+    actual.readFields(in);
+
+    Assert.assertEquals(4, actual.getNumTasks());
+    Assert.assertNotNull(actual.getTaskLocationHints());
+    Assert.assertEquals(4, actual.getTaskLocationHints().length);
+
+    Assert.assertNotNull(actual.getTaskLocationHints()[0]);
+    Assert.assertNull(actual.getTaskLocationHints()[1]);
+    Assert.assertNotNull(actual.getTaskLocationHints()[2]);
+    Assert.assertNotNull(actual.getTaskLocationHints()[3]);
+
+    Assert.assertArrayEquals(
+        expected.getTaskLocationHints()[0].getDataLocalHosts(),
+        actual.getTaskLocationHints()[0].getDataLocalHosts());
+    Assert.assertArrayEquals(
+        expected.getTaskLocationHints()[0].getRacks(),
+        actual.getTaskLocationHints()[0].getRacks());
+    Assert.assertNull(
+        actual.getTaskLocationHints()[2].getDataLocalHosts());
+    Assert.assertArrayEquals(
+        expected.getTaskLocationHints()[2].getRacks(),
+        actual.getTaskLocationHints()[2].getRacks());
+    Assert.assertArrayEquals(
+        expected.getTaskLocationHints()[3].getDataLocalHosts(),
+        actual.getTaskLocationHints()[3].getDataLocalHosts());
+    Assert.assertNull(
+        actual.getTaskLocationHints()[3].getRacks());
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/api/TestVertexLocationHint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tez.dag.app.rm.AMRMClient.ContainerRequest;
+import org.apache.tez.dag.app.rm.TaskScheduler.CRCookie;
+import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback;
+import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.mockito.Mockito.*;
+
+public class TestTaskScheduler {
+  
+  RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+    
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testTaskScheduler() throws YarnRemoteException {
+    RackResolver.init(new YarnConfiguration());
+    TaskSchedulerAppCallback mockApp = mock(TaskSchedulerAppCallback.class);
+    
+    AMRMClientAsync<CRCookie> mockRMClient = mock(AMRMClientAsync.class);
+    
+    ApplicationAttemptId attemptId = 
+        BuilderUtils.newApplicationAttemptId(
+                                  BuilderUtils.newApplicationId(1234, 0), 0);
+    String appHost = "host";
+    int appPort = 0;
+    String appUrl = "url";
+    TaskScheduler scheduler = new TaskScheduler(attemptId, mockApp, appHost, 
+                                                appPort, appUrl, mockRMClient);
+    
+    Configuration conf = new Configuration(); 
+    scheduler.init(conf);
+    verify(mockRMClient).init(conf);
+    
+    RegisterApplicationMasterResponse mockRegResponse = 
+                                mock(RegisterApplicationMasterResponse.class);
+    Resource mockMinResource = mock(Resource.class);
+    Resource mockMaxResource = mock(Resource.class);
+    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
+    when(mockRegResponse.getMinimumResourceCapability()).
+                                                   thenReturn(mockMinResource);
+    when(mockRegResponse.getMaximumResourceCapability()).
+                                                   thenReturn(mockMaxResource);
+    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);    
+    when(mockRMClient.
+          registerApplicationMaster(anyString(), anyInt(), anyString())).
+                                                   thenReturn(mockRegResponse);
+    scheduler.start();
+    verify(mockRMClient).start();
+    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    verify(mockApp).setApplicationRegistrationData(mockMinResource, 
+                                                   mockMaxResource, 
+                                                   mockAcls);
+    
+    when(mockRMClient.getClusterNodeCount()).thenReturn(5);
+    Assert.assertEquals(5, scheduler.getClusterNodeCount());
+    
+    Resource mockClusterResource = mock(Resource.class);
+    when(mockRMClient.getClusterAvailableResources()).
+                                              thenReturn(mockClusterResource);
+    Assert.assertEquals(mockClusterResource, 
+                        mockRMClient.getClusterAvailableResources());
+    
+    Object mockTask1 = mock(Object.class);
+    Object mockCookie1 = mock(Object.class);
+    Resource mockCapability = mock(Resource.class);
+    String[] hosts = {"host1", "host5"};
+    String[] racks = {"/default-rack", "/default-rack"};
+    Priority mockPriority = mock(Priority.class);
+    ArgumentCaptor<ContainerRequest> requestCaptor = 
+                              ArgumentCaptor.forClass(ContainerRequest.class);
+    // allocate task
+    scheduler.allocateTask(mockTask1, mockCapability, hosts, 
+                           racks, mockPriority, mockCookie1);
+    verify(mockRMClient, times(1)).
+                                addContainerRequest((ContainerRequest) any());
+
+    // returned from task requests before allocation happens
+    Assert.assertNull(scheduler.deallocateTask(mockTask1));
+    verify(mockRMClient, times(1)).
+                              removeContainerRequest((ContainerRequest) any());
+    verify(mockRMClient, times(0)).
+                                 releaseAssignedContainer((ContainerId) any());
+    
+    // deallocating unknown task
+    Assert.assertNull(scheduler.deallocateTask(mockTask1));
+    verify(mockRMClient, times(1)).
+                              removeContainerRequest((ContainerRequest) any());
+    verify(mockRMClient, times(0)).
+                                 releaseAssignedContainer((ContainerId) any());
+
+    // allocate tasks
+    Object mockTask2 = mock(Object.class);
+    Object mockCookie2 = mock(Object.class);
+    Object mockTask3 = mock(Object.class);
+    Object mockCookie3 = mock(Object.class);
+    scheduler.allocateTask(mockTask1, mockCapability, hosts, 
+        racks, mockPriority, mockCookie1);
+    verify(mockRMClient, times(2)).
+                                addContainerRequest(requestCaptor.capture());
+    ContainerRequest<CRCookie> request1 = requestCaptor.getValue();
+    scheduler.allocateTask(mockTask2, mockCapability, hosts, 
+        racks, mockPriority, mockCookie2);
+    verify(mockRMClient, times(3)).
+                                addContainerRequest(requestCaptor.capture());
+    ContainerRequest<CRCookie> request2 = requestCaptor.getValue();
+    scheduler.allocateTask(mockTask3, mockCapability, hosts, 
+        racks, mockPriority, mockCookie3);
+    verify(mockRMClient, times(4)).
+                                addContainerRequest(requestCaptor.capture());
+    ContainerRequest<CRCookie> request3 = requestCaptor.getValue();
+    
+    List<Container> containers = new ArrayList<Container>();
+    Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
+    ContainerId mockCId1 = mock(ContainerId.class);
+    when(mockContainer1.getId()).thenReturn(mockCId1);
+    containers.add(mockContainer1);
+    Container mockContainer2 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer2.getNodeId().getHost()).thenReturn("host2");
+    ContainerId mockCId2 = mock(ContainerId.class);
+    when(mockContainer2.getId()).thenReturn(mockCId2);
+    containers.add(mockContainer2);
+    Container mockContainer3 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer3.getNodeId().getHost()).thenReturn("host3");
+    ContainerId mockCId3 = mock(ContainerId.class);
+    when(mockContainer3.getId()).thenReturn(mockCId3);
+    containers.add(mockContainer3);
+    Container mockContainer4 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(mockContainer4.getNodeId().getHost()).thenReturn("host4");
+    ContainerId mockCId4 = mock(ContainerId.class);
+    when(mockContainer4.getId()).thenReturn(mockCId4);
+    containers.add(mockContainer4);
+    List<ContainerRequest<CRCookie>> hostContainers = 
+                                new ArrayList<ContainerRequest<CRCookie>>();
+    hostContainers.add(request1);
+    hostContainers.add(request2);
+    hostContainers.add(request3);
+    List<ContainerRequest<CRCookie>> rackContainers = 
+                                new ArrayList<ContainerRequest<CRCookie>>();
+    rackContainers.add(request2);
+    rackContainers.add(request3);
+    List<ContainerRequest<CRCookie>> anyContainers = 
+                                         Collections.singletonList(request3);
+    // return all requests for host1
+    when(
+        mockRMClient.getMatchingRequests((Priority) any(), eq("host1"),
+            (Resource) any())).thenReturn(hostContainers);
+    // first request matched by host
+    // second request matched to rack. RackResolver by default puts hosts in 
+    // /default-rack. We need to workaround by returning rack matches only once
+    when(
+        mockRMClient.getMatchingRequests((Priority) any(),
+            eq("/default-rack"), (Resource) any())).thenReturn(
+        rackContainers).thenReturn(null);    
+    // third request matched to ANY
+    when(
+        mockRMClient.getMatchingRequests((Priority) any(),
+            eq(ResourceRequest.ANY), (Resource) any())).thenReturn(
+        anyContainers).thenReturn(null);
+    scheduler.onContainersAllocated(containers);
+    // first container allocated
+    verify(mockApp).taskAllocated(mockTask1, mockCookie1, mockContainer1);
+    verify(mockApp).taskAllocated(mockTask2, mockCookie2, mockContainer2);
+    verify(mockApp).taskAllocated(mockTask3, mockCookie3, mockContainer3);
+    // no other allocations returned
+    verify(mockApp, times(3)).taskAllocated(any(), any(), (Container) any());
+    verify(mockRMClient).removeContainerRequest(request1);
+    verify(mockRMClient).removeContainerRequest(request2);
+    verify(mockRMClient).removeContainerRequest(request3);
+    // verify unwanted container released
+    verify(mockRMClient).releaseAssignedContainer(mockCId4);
+    
+    // deallocate allocated task
+    Assert.assertEquals(mockContainer1, scheduler.deallocateTask(mockTask1));
+    verify(mockRMClient).releaseAssignedContainer(mockCId1);
+    // deallocate allocated container
+    Assert.assertEquals(mockTask2, scheduler.deallocateContainer(mockCId2));
+    verify(mockRMClient).releaseAssignedContainer(mockCId2);
+    verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+    
+    List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+    ContainerStatus mockStatus1 = mock(ContainerStatus.class);
+    when(mockStatus1.getContainerId()).thenReturn(mockCId1);
+    statuses.add(mockStatus1);
+    ContainerStatus mockStatus2 = mock(ContainerStatus.class);
+    when(mockStatus2.getContainerId()).thenReturn(mockCId2);
+    statuses.add(mockStatus2);
+    ContainerStatus mockStatus3 = mock(ContainerStatus.class);
+    when(mockStatus3.getContainerId()).thenReturn(mockCId3);
+    statuses.add(mockStatus3);
+    ContainerStatus mockStatus4 = mock(ContainerStatus.class);
+    when(mockStatus4.getContainerId()).thenReturn(mockCId4);
+    statuses.add(mockStatus4);
+    
+    scheduler.onContainersCompleted(statuses);
+    // released container status returned
+    verify(mockApp).containerCompleted(mockTask1, mockStatus1);
+    verify(mockApp).containerCompleted(mockTask2, mockStatus2);
+    // currently allocated container status returned and not released
+    verify(mockApp).containerCompleted(mockTask3, mockStatus3);
+    // no other statuses returned
+    verify(mockApp, times(3)).containerCompleted(any(), (ContainerStatus) any());
+    verify(mockRMClient, times(3)).releaseAssignedContainer((ContainerId) any());
+    
+    
+    float progress = 0.5f;
+    when(mockApp.getProgress()).thenReturn(progress);
+    Assert.assertEquals(progress, scheduler.getProgress(), 0);
+    
+    List<NodeReport> mockUpdatedNodes = mock(List.class);
+    scheduler.onNodesUpdated(mockUpdatedNodes);
+    verify(mockApp).nodesUpdated(mockUpdatedNodes);
+    
+    Exception mockException = mock(Exception.class);
+    scheduler.onError(mockException);
+    verify(mockApp).onError(mockException);
+    
+    scheduler.onRebootRequest();
+    verify(mockApp).appRebootRequested();
+    
+    String appMsg = "success";
+    AppFinalStatus finalStatus = 
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.stop();
+    verify(mockRMClient).
+                  unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, 
+                                              appMsg, appUrl);
+    verify(mockRMClient).stop();
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/test/resources/krb5.conf
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/test/resources/krb5.conf?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/test/resources/krb5.conf (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/test/resources/krb5.conf Thu Apr 18 23:54:18 2013
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# 
+[libdefaults]
+	default_realm = APACHE.ORG
+	udp_preference_limit = 1
+	extra_addresses = 127.0.0.1
+[realms]
+	APACHE.ORG = {
+		admin_server = localhost:88
+		kdc = localhost:88
+	}
+[domain_realm]
+	localhost = APACHE.ORG

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/test/resources/krb5.conf
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dag/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/test/resources/log4j.properties?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/test/resources/log4j.properties (added)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/test/resources/log4j.properties Thu Apr 18 23:54:18 2013
@@ -0,0 +1,19 @@
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+# log4j configuration used during build and unit tests
+
+log4j.rootLogger=info,stdout
+log4j.threshhold=ALL
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n

Propchange: incubator/tez/branches/TEZ-1/tez-dag/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dist/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dist/pom.xml?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dist/pom.xml (added)
+++ incubator/tez/branches/TEZ-1/tez-dist/pom.xml Thu Apr 18 23:54:18 2013
@@ -0,0 +1,111 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <groupId>org.apache.tez</groupId>
+  <artifactId>tez-dist</artifactId>
+  <version>0.2.0-SNAPSHOT</version>
+
+  <packaging>pom</packaging>
+
+  <dependencies>
+    <!--tez-yarn-client should require all other modules to be built before it, so this becomes the last -->
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-mapreduce-examples</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <properties>
+    <package.format>dir</package.format>
+    <!--includeBaseDirectory is not used - replacement does not work in the packaging-->
+    <package.includeBaseDirectory>false</package.includeBaseDirectory>
+  </properties>
+
+  <profiles>
+    <profile>
+      <id>dist-tar</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+        <property>
+          <name>tar</name>
+        </property>
+      </activation>
+      <properties>
+        <package.format>tar.gz</package.format>
+        <package.includeBaseDirectory>true</package.includeBaseDirectory>
+      </properties>
+    </profile>
+  </profiles>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assembly/tez-dist.xml</descriptor>
+          </descriptors>
+          <tarLongFileMode>gnu</tarLongFileMode>
+          <appendAssemblyId>false</appendAssemblyId>
+          <attach>false</attach>
+          <finalName>tez-${project.version}</finalName>
+        </configuration>
+        <executions>
+          <execution>
+            <id>package-tez</id>
+            <configuration>
+              <formats>
+                <format>${package.format}</format>
+              </formats>
+            </configuration>
+            <phase>prepare-package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Propchange: incubator/tez/branches/TEZ-1/tez-dist/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-dist/src/main/assembly/tez-dist.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dist/src/main/assembly/tez-dist.xml?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dist/src/main/assembly/tez-dist.xml (added)
+++ incubator/tez/branches/TEZ-1/tez-dist/src/main/assembly/tez-dist.xml Thu Apr 18 23:54:18 2013
@@ -0,0 +1,69 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+  <id>tez-dist</id>
+  <!--includeBaseDirectory paramter replacement does not seem to work -->
+  <!--includeBaseDirectory>${package.includeBaseDirectory}</includeBaseDirectory-->
+  <!--fileSets>
+    <fileSet>
+      <directory>${project.build.directory}</directory>
+      <outputDirectory>/</outputDirectory>
+      <includes>
+        <include>${project.artifactId}-${project.version}.jar</include>
+      </includes>
+    </fileSet>
+  </fileSets-->
+  <moduleSets>
+    <moduleSet>
+      <useAllReactorProjects>true</useAllReactorProjects>
+      <includes>
+        <include>org.apache.tez:tez-dag</include>
+        <include>org.apache.tez:tez-yarn-client</include>
+      </includes>
+      <binaries>
+        <outputDirectory>/</outputDirectory>
+        <includeDependencies>false</includeDependencies>
+        <unpack>false</unpack>
+      </binaries>
+    </moduleSet>
+  </moduleSets>
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>false</useProjectArtifact>
+      <outputDirectory>/lib</outputDirectory>
+      <!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
+      <excludes>
+        <exclude>org.apache.hadoop:hadoop-common</exclude>
+        <exclude>org.apache.hadoop:hadoop-auth</exclude>
+        <exclude>org.apache.hadoop:hadoop-annotations</exclude>
+        <!--exclude>org.apache.hadoop:hadoop-mapreduce-client-common</exclude-->
+        <!--exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude-->
+        <!--exclude>org.apache.hadoop:hadoop-mapreduce-client-shuffle</exclude-->
+        <exclude>org.apache.hadoop:hadoop-yarn-api</exclude>
+        <exclude>org.apache.hadoop:hadoop-yarn-client</exclude>
+        <exclude>org.apache.hadoop:hadoop-yarn-common</exclude>
+        <exclude>org.apache.hadoop:hadoop-yarn-server-common</exclude>
+        <exclude>org.apache.hadoop:hadoop-yarn-server-nodemanager</exclude>
+        <!-- use slf4j from common to avoid multiple binding warnings -->
+        <exclude>org.slf4j:slf4j-api</exclude>
+        <exclude>org.slf4j:slf4j-log4j12</exclude>
+      </excludes>
+    </dependencySet>
+  </dependencySets>
+</assembly>

Propchange: incubator/tez/branches/TEZ-1/tez-dist/src/main/assembly/tez-dist.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/pom.xml?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/pom.xml (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/pom.xml Thu Apr 18 23:54:18 2013
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-engine-api</artifactId>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * {@link Input} represents a pipe through which an <em>tez</em> task
+ * can get input key/value pairs.
+ */
+public interface Input {
+  
+  /**
+   * Initialize <code>Input</code>.
+   * 
+   * @param conf job configuration
+   * @param master master process controlling the task
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException;
+  
+  /**
+   * Check if there is another key/value pair.
+   * 
+   * @return true if a key/value pair was read
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public boolean hasNext() throws IOException, InterruptedException;
+
+  /**
+   * Get the next key.
+   * 
+   * @return the current key or null if there is no current key
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public Object getNextKey() throws IOException, InterruptedException;
+  
+  /**
+   * Get the next values.
+   * 
+   * @return the object that was read
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public Iterable<Object> getNextValues() 
+      throws IOException, InterruptedException;
+  
+  /**
+   * The current progress of the {@link Input} through its data.
+   * 
+   * @return a number between 0.0 and 1.0 that is the fraction of the data read
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public float getProgress() throws IOException, InterruptedException;
+  
+  /**
+   * Close this <code>Input</code> for future operations.
+   */
+  public void close() throws IOException;
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.api;
+
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
+
+/**
+ * {@link Master} represents the master controlling the {@link Task}. 
+ */
+@ProtocolInfo(protocolName = "Master", protocolVersion = 1)
+public interface Master extends VersionedProtocol {
+
+  // TODO TEZAM3 This likely needs to change to be a little more generic.
+  // Many output / input relationships cannot be captured via this. The current
+  // form works primarily works for the existing MR
+
+  TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+      int fromEventIdx, int maxEventsToFetch,
+      TezTaskAttemptID taskAttemptId);
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.engine.records.OutputContext;
+
+/**
+ * {@link Output} represents a pipe through which an <em>tez</em> task
+ * can send out outputs.
+ */
+public interface Output {
+
+  /**
+   * Initialize <code>Output</code>.
+   * 
+   * @param conf job configuration
+   * @param master master process controlling the task
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException;
+
+  /** 
+   * Writes a key/value pair.
+   *
+   * @param key the key to write.
+   * @param value the value to write.
+   * @throws IOException
+   */      
+  public void write(Object key, Object value
+                             ) throws IOException, InterruptedException;
+
+  /**
+   * Returns the OutputContext for the particular <code>Output</code>. 
+   * 
+   * @return the OutputContext for this Output if it exists, otherwise null.
+   */
+  public OutputContext getOutputContext();
+  
+  /** 
+   * Close this <code>Output</code> for future operations.
+   * 
+   * @throws IOException
+   */ 
+  public void close() throws IOException, InterruptedException;
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.api;
+
+/**
+ * {@link Partitioner} is used by the TEZ framework to partition 
+ * output key/value pairs.
+ */
+public interface Partitioner {
+  
+  /**
+   * Get partition for given key/value.
+   * @param key key
+   * @param value value
+   * @param numPartitions number of partitions
+   * @return
+   */
+  int getPartition(Object key, Object value, int numPartitions);
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * {@link Processor} represents the <em>tez</em> entity responsible for
+ * consuming {@link Input} and producing {@link Output}. 
+ */
+public interface Processor {
+  
+  /**
+   * Initialize the <code>Processor</code>.
+   * 
+   * @param conf job-configuration
+   * @param master master process controlling the task
+   * @throws IOException 
+   * @throws InterruptedException
+   */
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException;
+  
+  /**
+   * Process input data from <code>input</code> and 
+   * send it to <code>output</code>.
+   * 
+   * @param in input
+   * @param out output
+   * @param master master process controlling the task
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void process(Input in, Output  out) 
+      throws IOException, InterruptedException;
+
+  /**
+   * Close the {@link Processor}.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void close() throws IOException, InterruptedException;
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * {@link Task} is the base <em>tez</em> entity which consumes 
+ * input key/value pairs through an {@link Input} pipe, 
+ * processes them via a {@link Processor} and 
+ * produces output key/value pairs for an {@link Output} pipe.
+ */
+public interface Task {
+  
+  /**
+   * Initialize the {@link Task}.
+   * 
+   * @param conf task configuration
+   * @param master master controlling the task
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException;
+  
+  /**
+   * Get {@link Input} of the task.
+   * @return <code>Input</code> of the task
+   */
+  public Input getInput();
+
+  /**
+   * Get {@link Processor} of the task.
+   * @return <code>Processor</code> of the task
+   */
+  public Processor getProcessor();
+
+  /**
+   * Get {@link Output} of the task.
+   * @return <code>Output</code> of the task
+   */
+  public Output getOutput();
+
+  /**
+   * Run the {@link Task}.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void run() throws IOException, InterruptedException;
+  
+  /**
+   * Stop the {@link Task}.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void close() throws IOException, InterruptedException;
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/api/Task.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/OutputContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/OutputContext.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/OutputContext.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/OutputContext.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+//TODO EVENTUALLY Add more interfaces. Maybe make this an abstract class.
+/**
+ * Contains context information for Output. For example, shuffle headers, size
+ * of output, etc. TODO Ideally should be Output specific.
+ */
+
+public class OutputContext implements Writable {
+
+  public OutputContext(int shufflePort) {
+    this.shufflePort = shufflePort;
+  }
+  
+  public OutputContext() {
+  }
+
+  public int shufflePort;
+
+  public int getShufflePort() {
+    return this.shufflePort;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(shufflePort);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    shufflePort = in.readInt();
+  }
+  
+  @Override
+  public String toString() {
+    return "shufflePort: " + shufflePort;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/OutputContext.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message