tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject svn commit: r1469642 [21/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...
Date Thu, 18 Apr 2013 23:54:28 GMT
Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezDAGID.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezDAGID.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezDAGID.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezDAGID.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+
+
+/**
+ * TezDAGID represents the immutable and unique identifier for
+ * a Tez DAG.
+ *
+ * TezDAGID consists of 2 parts. The first part is the {@link ApplicationId},
+ * that is the YARN Application ID that this DAG belongs to. The second part is
+ * the DAG number.
+ *
+ * @see ApplicationId
+ */
+public class TezDAGID extends TezID {
+
+  public static final String DAG = "dag";
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+
+  private ApplicationId applicationId;
+
+  public TezDAGID() {
+  }
+
+  /**
+   * Constructs a DAGID object from given {@link ApplicationId}.
+   * @param applicationId Application that this dag belongs to
+   * @param id the dag number
+   */
+  public TezDAGID(ApplicationId applicationId, int id) {
+    super(id);
+    if(applicationId == null) {
+      throw new IllegalArgumentException("applicationId cannot be null");
+    }
+    this.applicationId = applicationId;
+  }
+
+  /**
+   * Constructs a DAGID object from given parts.
+   * @param yarnRMIdentifier YARN RM identifier
+   * @param applicationId application number
+   * @param id the dag number
+   */
+  public TezDAGID(String yarnRMIdentifier, int appId, int id) {
+    this(BuilderUtils.newApplicationId(Long.valueOf(yarnRMIdentifier),
+        appId), id);
+  }
+
+  /** Returns the {@link ApplicationId} object that this dag belongs to */
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    TezDAGID that = (TezDAGID)o;
+    return this.applicationId.equals(that.applicationId);
+  }
+
+  /**Compare TaskInProgressIds by first jobIds, then by tip numbers and type.*/
+  @Override
+  public int compareTo(TezID o) {
+    TezDAGID that = (TezDAGID)o;
+    return this.applicationId.compareTo(that.applicationId);
+  }
+
+  @Override
+  public String toString() {
+    return appendTo(new StringBuilder(DAG)).toString();
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    applicationId = BuilderUtils.newApplicationId(in.readLong(), in.readInt());
+    super.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(applicationId.getClusterTimestamp());
+    out.writeInt(applicationId.getId());
+    super.write(out);
+  }
+
+  /**
+   * Add the unique string to the given builder.
+   * @param builder the builder to append to
+   * @return the builder that was passed in
+   */
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return builder.append(SEPARATOR).
+                 append(applicationId.getClusterTimestamp()).
+                 append(SEPARATOR).
+                 append(applicationId.getId()).
+                 append(SEPARATOR).
+                 append(idFormat.format(id));
+  }
+
+  @Override
+  public int hashCode() {
+    return applicationId.hashCode() * 524287 + id;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezDAGID.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * This is used to track task completion events on 
+ * job tracker. 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+// TODO TEZAM3 This needs to be more generic. Maybe some kind of a serialized
+// blob - which can be interpretted by the Input plugin.
+public class TezDependentTaskCompletionEvent implements Writable {
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  // TODO EVENTUALLY - Remove TIPFAILED state ?
+  static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED};
+    
+  private int eventId;
+  // TODO EVENTUALLY - rename.
+  private String taskTrackerHttp;
+  private int taskRunTime; // using int since runtime is the time difference
+  private TezTaskAttemptID taskAttemptId;
+  Status status;
+  // TODO TEZAM2 Get rid of the isMap field. Job specific type information can be determined from TaskAttemptId.getTaskType
+//  boolean isMap = false;
+  public static final TezDependentTaskCompletionEvent[] EMPTY_ARRAY = 
+    new TezDependentTaskCompletionEvent[0];
+
+  public TezDependentTaskCompletionEvent() {
+    taskAttemptId = new TezTaskAttemptID();
+  }
+  
+  /**
+   * Constructor. eventId should be created externally and incremented
+   * per event for each job. 
+   * @param eventId event id, event id should be unique and assigned in
+   *  incrementally, starting from 0. 
+   * @param taskId task id
+   * @param status task's status 
+   * @param taskTrackerHttp task tracker's host:port for http. 
+   */
+  public TezDependentTaskCompletionEvent(int eventId, 
+                             TezTaskAttemptID taskId,
+//                             boolean isMap,
+                             Status status, 
+                             String taskTrackerHttp,
+                             int runTime){
+      
+    this.taskAttemptId = taskId;
+//    this.isMap = isMap;
+    this.eventId = eventId; 
+    this.status =status; 
+    this.taskTrackerHttp = taskTrackerHttp;
+    this.taskRunTime = runTime;
+  }
+  /**
+   * Returns event Id. 
+   * @return event id
+   */
+  public int getEventId() {
+    return eventId;
+  }
+
+  /**
+   * Returns task id. 
+   * @return task id
+   */
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskAttemptId;
+  }
+  
+  /**
+   * Returns enum Status.SUCESS or Status.FAILURE.
+   * @return task tracker status
+   */
+  public Status getStatus() {
+    return status;
+  }
+  /**
+   * http location of the tasktracker where this task ran. 
+   * @return http location of tasktracker user logs
+   */
+  public String getTaskTrackerHttp() {
+    return taskTrackerHttp;
+  }
+
+  /**
+   * Returns time (in millisec) the task took to complete. 
+   */
+  public int getTaskRunTime() {
+    return taskRunTime;
+  }
+
+  /**
+   * Set the task completion time
+   * @param taskCompletionTime time (in millisec) the task took to complete
+   */
+  protected void setTaskRunTime(int taskCompletionTime) {
+    this.taskRunTime = taskCompletionTime;
+  }
+
+  /**
+   * set event Id. should be assigned incrementally starting from 0. 
+   * @param eventId
+   */
+  public void setEventId(int eventId) {
+    this.eventId = eventId;
+  }
+
+  /**
+   * Sets task id. 
+   * @param taskId
+   */
+  public void setTaskAttemptID(TezTaskAttemptID taskId) {
+    this.taskAttemptId = taskId;
+  }
+  
+  /**
+   * Set task status. 
+   * @param status
+   */
+  public void setTaskStatus(Status status) {
+    this.status = status;
+  }
+  
+  /**
+   * Set task tracker http location. 
+   * @param taskTrackerHttp
+   */
+  public void setTaskTrackerHttp(String taskTrackerHttp) {
+    this.taskTrackerHttp = taskTrackerHttp;
+  }
+    
+  @Override
+  public String toString(){
+    StringBuffer buf = new StringBuffer(); 
+    buf.append("Task Id : "); 
+    buf.append(taskAttemptId); 
+    buf.append(", Status : ");  
+    buf.append(status.name());
+    return buf.toString();
+  }
+    
+  @Override
+  public boolean equals(Object o) {
+    if(o == null)
+      return false;
+    if(o.getClass().equals(this.getClass())) {
+      TezDependentTaskCompletionEvent event = (TezDependentTaskCompletionEvent) o;
+      return this.eventId == event.getEventId()
+             && this.status.equals(event.getStatus())
+             && this.taskAttemptId.equals(event.getTaskAttemptID()) 
+             && this.taskRunTime == event.getTaskRunTime()
+             && this.taskTrackerHttp.equals(event.getTaskTrackerHttp());
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return toString().hashCode(); 
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskAttemptId.write(out);
+//    out.writeBoolean(isMap);
+    WritableUtils.writeEnum(out, status);
+    WritableUtils.writeString(out, taskTrackerHttp);
+    WritableUtils.writeVInt(out, taskRunTime);
+    WritableUtils.writeVInt(out, eventId);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskAttemptId.readFields(in);
+//    isMap = in.readBoolean();
+    status = WritableUtils.readEnum(in, Status.class);
+    taskTrackerHttp = WritableUtils.readString(in);
+    taskRunTime = WritableUtils.readVInt(in);
+    eventId = WritableUtils.readVInt(in);
+    
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezID.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezID.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezID.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezID.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * A general identifier, which internally stores the id
+ * as an integer. This is the super class of {@link TezJobID}, 
+ * {@link TezTaskID} and {@link TezTaskAttemptID}.
+ * 
+ * @see TezTaskID
+ * @see TezTaskAttemptID
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public abstract class TezID implements WritableComparable<TezID> {
+  public static final char SEPARATOR = '_';
+  protected int id;
+
+  /** constructs an ID object from the given int */
+  public TezID(int id) {
+    this.id = id;
+  }
+
+  protected TezID() {
+  }
+
+  /** returns the int which represents the identifier */
+  public int getId() {
+    return id;
+  }
+
+  @Override
+  public String toString() {
+    return String.valueOf(id);
+  }
+
+  @Override
+  public int hashCode() {
+    return id;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if(o == null)
+      return false;
+    if (o.getClass() == this.getClass()) {
+      TezID that = (TezID) o;
+      return this.id == that.id;
+    }
+    else
+      return false;
+  }
+
+  /** Compare IDs by associated numbers */
+  public int compareTo(TezID that) {
+    return this.id - that.id;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.id = in.readInt();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(id);
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezID.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskAttemptID.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskAttemptID.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskAttemptID.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskAttemptID.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * TezTaskAttemptID represents the immutable and unique identifier for
+ * a task attempt. Each task attempt is one particular instance of a Tez Task
+ * identified by its TezTaskID.
+ *
+ * TezTaskAttemptID consists of 2 parts. First part is the
+ * {@link TezTaskID}, that this TaskAttemptID belongs to.
+ * Second part is the task attempt number. <br>
+ * <p>
+ * Applications should never construct or parse TaskAttemptID strings
+ * , but rather use appropriate constructors or {@link #forName(String)}
+ * method.
+ *
+ * @see TezTaskID
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class TezTaskAttemptID extends TezID {
+  public static final String ATTEMPT = "attempt";
+  private TezTaskID taskId;
+  
+  public TezTaskAttemptID() {
+    taskId = new TezTaskID();
+  }
+  
+  /**
+   * Constructs a TaskAttemptID object from given {@link TezTaskID}.  
+   * @param taskId TaskID that this task belongs to  
+   * @param id the task attempt number
+   */
+  public TezTaskAttemptID(TezTaskID taskId, int id) {
+    super(id);
+    if(taskId == null) {
+      throw new IllegalArgumentException("taskId cannot be null");
+    }
+    this.taskId = taskId;
+  }
+
+  /** Returns the {@link TezTaskID} object that this task attempt belongs to */
+  public TezTaskID getTaskID() {
+    return taskId;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    TezTaskAttemptID that = (TezTaskAttemptID)o;
+    return this.taskId.equals(that.taskId);
+  }
+  
+  /**
+   * Add the unique string to the StringBuilder
+   * @param builder the builder to append ot
+   * @return the builder that was passed in.
+   */
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return taskId.appendTo(builder).append(SEPARATOR).append(id);
+  }
+  
+  @Override
+  public int hashCode() {
+    return taskId.hashCode() * 539501 + id;
+  }
+  
+  /**Compare TaskIds by first tipIds, then by task numbers. */
+  @Override
+  public int compareTo(TezID o) {
+    TezTaskAttemptID that = (TezTaskAttemptID)o;
+    int tipComp = this.taskId.compareTo(that.taskId);
+    if(tipComp == 0) {
+      return this.id - that.id;
+    }
+    else return tipComp;
+  }
+  @Override
+  public String toString() { 
+    return appendTo(new StringBuilder(ATTEMPT)).toString();
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskId.readFields(in);
+    super.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+    super.write(out);
+  }
+
+  // FIXME TEZ DAG needs to be removed
+  public static TezTaskAttemptID read(DataInput in) throws IOException {
+    TezTaskAttemptID tId = new TezTaskAttemptID();
+    tId.readFields(in);
+    return tId;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskAttemptID.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+
+public class TezTaskDependencyCompletionEventsUpdate implements Writable {
+  TezDependentTaskCompletionEvent[] events;
+  boolean reset;
+
+  public TezTaskDependencyCompletionEventsUpdate() { }
+
+  public TezTaskDependencyCompletionEventsUpdate(
+      TezDependentTaskCompletionEvent[] events, boolean reset) {
+    this.events = events;
+    this.reset = reset;
+  }
+
+  public boolean shouldReset() {
+    return reset;
+  }
+
+  public TezDependentTaskCompletionEvent[] getDependentTaskCompletionEvents() {
+    return events;
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(reset);
+    out.writeInt(events.length);
+    for (TezDependentTaskCompletionEvent event : events) {
+      event.write(out);
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    reset = in.readBoolean();
+    events = new TezDependentTaskCompletionEvent[in.readInt()];
+    for (int i = 0; i < events.length; ++i) {
+      events[i] = new TezDependentTaskCompletionEvent();
+      events[i].readFields(in);
+    }
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskID.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskID.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskID.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskID.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+
+/**
+ * TaskID represents the immutable and unique identifier for 
+ * a Tez Task. Each TaskID encompasses multiple attempts made to
+ * execute the Tez Task, each of which are uniquely identified by
+ * their TezTaskAttemptID.
+ * 
+ * @see TezTaskAttemptID
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class TezTaskID extends TezID {
+  public static final String TASK = "task";
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+  
+  private TezVertexID vertexId;
+  
+  public TezTaskID() {
+    vertexId = new TezVertexID();
+  }
+  
+  /**
+   * Constructs a TaskID object from given {@link MRxApplicationID}.  
+   * @param jobId JobID that this tip belongs to 
+   * @param type the {@link TezTaskType} of the task 
+   * @param id the tip number
+   */
+  public TezTaskID(TezVertexID vertexId, int id) {
+    super(id);
+    if(vertexId == null) {
+      throw new IllegalArgumentException("vertexId cannot be null");
+    }
+    this.vertexId = vertexId;
+  }
+  
+  /** Returns the {@link TezVertexID} object that this task belongs to */
+  public TezVertexID getVertexID() {
+    return vertexId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    TezTaskID that = (TezTaskID)o;
+    return this.vertexId.equals(that.vertexId);
+  }
+
+  /**Compare TaskInProgressIds by first jobIds, then by tip numbers and type.*/
+  @Override
+  public int compareTo(TezID o) {
+    TezTaskID that = (TezTaskID)o;
+    int vertexComp = this.vertexId.compareTo(that.vertexId);
+    if(vertexComp == 0) {
+      return this.id - that.id;
+    }
+    else return vertexComp;
+  }
+  @Override
+  public String toString() { 
+    return appendTo(new StringBuilder(TASK)).toString();
+  }
+
+  /**
+   * Add the unique string to the given builder.
+   * @param builder the builder to append to
+   * @return the builder that was passed in
+   */
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return vertexId.appendTo(builder).
+                 append(SEPARATOR).
+                 append(idFormat.format(id));
+  }
+  
+  @Override
+  public int hashCode() {
+    return vertexId.hashCode() * 535013 + id;
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    vertexId.readFields(in);
+    super.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    vertexId.write(out);
+    super.write(out);
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskID.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskType.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskType.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskType.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskType.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,26 @@
+package org.apache.tez.engine.records;
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with this
+// * work for additional information regarding copyright ownership. The ASF
+// * licenses this file to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance with the License.
+// * You may obtain a copy of the License at
+// * 
+// * http://www.apache.org/licenses/LICENSE-2.0
+// * 
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+// * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+// * License for the specific language governing permissions and limitations under
+// * the License.
+// */
+//
+//package org.apache.tez.records;
+//
+//
+//public interface TezTaskType {
+//  
+//  public String toSerializedString();
+//
+//}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezTaskType.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezVertexID.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezVertexID.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezVertexID.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezVertexID.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * TezVertexID represents the immutable and unique identifier for
+ * a Vertex in a Tez DAG. Each TezVertexID encompasses multiple Tez Tasks.
+ *
+ * TezVertezID consists of 2 parts. The first part is the {@link TezDAGID},
+ * that is the Tez DAG that this vertex belongs to. The second part is
+ * the vertex number.
+ *
+ * @see TezDAGID
+ * @see TezTaskID
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class TezVertexID extends TezID {
+  public static final String VERTEX = "vertex";
+  protected static final NumberFormat idFormat = NumberFormat.getInstance();
+  static {
+    idFormat.setGroupingUsed(false);
+    idFormat.setMinimumIntegerDigits(6);
+  }
+  
+  private TezDAGID dagId;
+  
+  public TezVertexID() {
+  }
+  
+  /**
+   * Constructs a TaskID object from given {@link TezDAGID}.
+   * @param applicationId JobID that this tip belongs to
+   * @param type the {@link TezTaskType} of the task
+   * @param id the tip number
+   */
+  public TezVertexID(TezDAGID dagId, int id) {
+    super(id);
+    if(dagId == null) {
+      throw new IllegalArgumentException("dagId cannot be null");
+    }
+    this.dagId = dagId;
+  }
+  
+  /** Returns the {@link TezDAGID} object that this tip belongs to */
+  public TezDAGID getDAGId() {
+    return dagId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o))
+      return false;
+
+    TezVertexID that = (TezVertexID)o;
+    return this.dagId.equals(that.dagId);
+  }
+
+  /**Compare TaskInProgressIds by first jobIds, then by tip numbers and type.*/
+  @Override
+  public int compareTo(TezID o) {
+    TezVertexID that = (TezVertexID)o;
+    return this.dagId.compareTo(that.dagId);
+  }
+  
+  @Override
+  public String toString() { 
+    return appendTo(new StringBuilder(VERTEX)).toString();
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    dagId = new TezDAGID();
+    dagId.readFields(in);
+    super.readFields(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    dagId.write(out);
+    super.write(out);
+  }
+
+  /**
+   * Add the unique string to the given builder.
+   * @param builder the builder to append to
+   * @return the builder that was passed in
+   */
+  protected StringBuilder appendTo(StringBuilder builder) {
+    return builder.append(SEPARATOR).
+        append(dagId.getApplicationId().getClusterTimestamp()).
+        append(SEPARATOR).
+        append(dagId.getApplicationId().getId()).
+        append(SEPARATOR).
+        append(dagId.getId()).
+        append(SEPARATOR).
+        append(idFormat.format(id));
+  }
+  
+  @Override
+  public int hashCode() {
+    return dagId.hashCode() * 530017 + id;
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine-api/src/main/java/org/apache/tez/engine/records/TezVertexID.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/pom.xml
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/pom.xml?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/pom.xml (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/pom.xml Thu Apr 18 23:54:18 2013
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez</artifactId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-engine</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.inject.extensions</groupId>
+      <artifactId>guice-assistedinject</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

Propchange: incubator/tez/branches/TEZ-1/tez-engine/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
+@InterfaceStability.Unstable
+public class BufferUtils {
+  public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = buf1.getPosition();
+    int s2 = buf2.getPosition();
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataOutputBuffer buf1, DataOutputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = 0;
+    int s2 = 0;
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataInputBuffer buf1, DataOutputBuffer buf2) {
+    byte[] b1 = buf1.getData();
+    byte[] b2 = buf2.getData();
+    int s1 = buf1.getPosition();    
+    int s2 = 0;
+    int l1 = buf1.getLength();
+    int l2 = buf2.getLength();
+    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
+  }
+
+  public static int compare(DataOutputBuffer buf1, DataInputBuffer buf2) {
+    return compare(buf2, buf1);
+  }
+
+  public static void copy(DataInputBuffer src, DataOutputBuffer dst) 
+                              throws IOException {
+    byte[] b1 = src.getData();
+    int s1 = src.getPosition();    
+    int l1 = src.getLength();
+    dst.reset();
+    dst.write(b1, s1, l1 - s1);
+  }
+
+  public static void copy(DataOutputBuffer src, DataOutputBuffer dst) 
+                              throws IOException {
+    byte[] b1 = src.getData();
+    int s1 = 0;
+    int l1 = src.getLength();
+    dst.reset();
+    dst.write(b1, s1, l1);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/hadoop/io/BufferUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.io;
+
+public interface HashComparator<KEY> {
+
+  int getHashCode(KEY key);
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/hadoop/io/HashComparator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class ContainerTask implements Writable {
+
+  TezEngineTask tezEngineTask;
+  boolean shouldDie;
+
+  public ContainerTask() {
+  }
+
+  public ContainerTask(TezTask tezTaskContext, boolean shouldDie) {
+    this.tezEngineTask = (TezEngineTask)tezTaskContext;
+    this.shouldDie = shouldDie;
+  }
+
+  public TezEngineTask getTezEngineTaskContext() {
+    return tezEngineTask;
+  }
+
+  public boolean shouldDie() {
+    return shouldDie;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(shouldDie);
+    if (tezEngineTask != null) {
+      out.writeBoolean(true);
+      tezEngineTask.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    shouldDie = in.readBoolean();
+    boolean taskComing = in.readBoolean();
+    if (taskComing) {
+      tezEngineTask = new TezEngineTask();
+      tezEngineTask.readFields(in);
+    }
+  }
+  
+  @Override
+  public String toString() {
+    return "shouldDie: " + shouldDie + ", tezEngineTaskContext: "
+        + tezEngineTask;
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+
+public class TezEngineTask extends TezTask {
+
+  // These two could be replaced by a TezConfiguration / DagSpec.
+  private List<InputSpec> inputSpecList;
+  private List<OutputSpec> outputSpecList;
+  private String taskModuleClassName;
+  
+  public TezEngineTask() {
+    super();
+  }
+
+  public TezEngineTask(TezTaskAttemptID taskAttemptID, String user,
+      String jobName, String vertexName, String moduleClassName,
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
+    super(taskAttemptID, user, jobName, vertexName);
+    this.inputSpecList = inputSpecList;
+    this.outputSpecList = outputSpecList;
+    if (this.inputSpecList == null) {
+      inputSpecList = new ArrayList<InputSpec>(0);
+    }
+    if (this.outputSpecList == null) {
+      outputSpecList = new ArrayList<OutputSpec>(0);
+    }
+    this.inputSpecList = inputSpecList;
+    this.outputSpecList = outputSpecList;
+    this.taskModuleClassName = moduleClassName;
+  }
+
+  public String getTaskModuleClassName() {
+    return taskModuleClassName;
+  }
+  
+  public List<InputSpec> getInputSpecList() {
+    return this.inputSpecList;
+  }
+  
+  public List<OutputSpec> getOutputSpecList() {
+    return this.outputSpecList;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    Text.writeString(out, taskModuleClassName);
+    out.writeInt(inputSpecList.size());
+    for (InputSpec inputSpec : inputSpecList) {
+      inputSpec.write(out);
+    }
+    out.writeInt(outputSpecList.size());
+    for (OutputSpec outputSpec : outputSpecList) {
+      outputSpec.write(out);
+    }
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    
+    taskModuleClassName = Text.readString(in);
+    int numInputSpecs = in.readInt();
+    inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
+    for (int i = 0; i < numInputSpecs; i++) {
+      InputSpec inputSpec = new InputSpec();
+      inputSpec.readFields(in);
+      inputSpecList.add(inputSpec);
+    }
+    int numOutputSpecs = in.readInt();
+    outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
+    for (int i = 0; i < numOutputSpecs; i++) {
+      OutputSpec outputSpec = new OutputSpec();
+      outputSpec.readFields(in);
+      outputSpecList.add(outputSpec);
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/common/TezEngineTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,101 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class ConfigUtils {
+  public static  Class<? extends CompressionCodec> getMapOutputCompressorClass(
+      Configuration conf, Class<DefaultCodec> class1) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public static  boolean getCompressMapOutput(Configuration conf) {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  public static <V> Class<V> getMapOutputValueClass(Configuration conf) {
+    Class<V> retv = 
+        (Class<V>) 
+        conf.getClass("mapreduce.map.output.value.class", null, Object.class);
+    if (retv == null) {
+      retv = getOutputValueClass(conf);
+    }
+    return retv;
+  }
+
+  public static <V> Class<V> getOutputValueClass(Configuration conf) {
+    return (Class<V>) conf.getClass(
+        "mapreduce.job.output.value.class", Text.class, Object.class);
+  }
+
+  public static <K> Class<K> getMapOutputKeyClass(Configuration conf) {
+    Class<K> retv = 
+        (Class<K>) conf.getClass("mapreduce.map.output.key.class", null, Object.class);
+    if (retv == null) {
+      retv = getOutputKeyClass(conf);
+    }
+    return 
+        retv;
+  }
+
+  public static <K> Class<K> getOutputKeyClass(Configuration conf) {
+    return 
+        (Class<K>) 
+        conf.getClass(
+            "mapreduce.job.output.key.class", 
+            LongWritable.class, Object.class);
+}
+  
+  public static <K> RawComparator<K> getOutputKeyComparator(Configuration conf) {
+    Class<? extends RawComparator> theClass = 
+        conf.getClass(
+            "mapreduce.job.output.key.comparator.class", null, 
+            RawComparator.class);
+      if (theClass != null)
+        return ReflectionUtils.newInstance(theClass, conf);
+      return WritableComparator.get(
+          getMapOutputKeyClass(conf).asSubclass(WritableComparable.class));
+    }
+
+  public static <V> RawComparator<V> getOutputValueGroupingComparator(
+      Configuration conf) {
+    Class<? extends RawComparator> theClass = 
+        conf.getClass(
+            "mapreduce.job.output.group.comparator.class", 
+            null, RawComparator.class);
+    if (theClass == null) {
+      return getOutputKeyComparator(conf);
+    }
+
+    return ReflectionUtils.newInstance(theClass, conf);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/ConfigUtils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+@Private
+@Unstable
+public class YARNMaster {
+  
+  public enum State {
+    INITIALIZING, RUNNING;
+  }
+
+  public static String getMasterUserName(Configuration conf) {
+    return conf.get(YarnConfiguration.RM_PRINCIPAL);
+  }
+  
+  public static InetSocketAddress getMasterAddress(Configuration conf) {
+    return conf.getSocketAddr(
+        YarnConfiguration.RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_PORT);
+  }
+
+  public static String getMasterPrincipal(Configuration conf) 
+  throws IOException {
+    String masterHostname = getMasterAddress(conf).getHostName();
+    // get kerberos principal for use as delegation token renewer
+    return SecurityUtil.getServerPrincipal(
+        getMasterUserName(conf), masterHostname);
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/YARNMaster.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.combine;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class CombineInput implements Input {
+
+  private final TezRawKeyValueIterator input;
+  private TezCounter inputValueCounter;
+  private TezCounter inputKeyCounter;
+  private RawComparator<Object> comparator;
+  private Object key;                                  // current key
+  private Object value;                              // current value
+  private boolean firstValue = false;                 // first value in key
+  private boolean nextKeyIsSame = false;              // more w/ this key
+  private boolean hasMore;                            // more in file
+  protected Progressable reporter;
+  private Deserializer keyDeserializer;
+  private Deserializer valueDeserializer;
+  private DataInputBuffer buffer = new DataInputBuffer();
+  private BytesWritable currentRawKey = new BytesWritable();
+  private ValueIterable iterable = new ValueIterable();
+  
+  public CombineInput(TezRawKeyValueIterator kvIter) {
+    this.input = kvIter;
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+  }
+
+  public boolean hasNext() throws IOException, InterruptedException {
+    while (hasMore && nextKeyIsSame) {
+      nextKeyValue();
+    }
+    if (hasMore) {
+      if (inputKeyCounter != null) {
+        inputKeyCounter.increment(1);
+      }
+      return nextKeyValue();
+    } else {
+      return false;
+    }
+  }
+
+  private boolean nextKeyValue() throws IOException, InterruptedException {
+    if (!hasMore) {
+      key = null;
+      value = null;
+      return false;
+    }
+    firstValue = !nextKeyIsSame;
+    DataInputBuffer nextKey = input.getKey();
+    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
+                      nextKey.getLength() - nextKey.getPosition());
+    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
+    key = keyDeserializer.deserialize(key);
+    DataInputBuffer nextVal = input.getValue();
+    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
+    value = valueDeserializer.deserialize(value);
+
+    hasMore = input.next();
+    if (hasMore) {
+      nextKey = input.getKey();
+      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
+                                     currentRawKey.getLength(),
+                                     nextKey.getData(),
+                                     nextKey.getPosition(),
+                                     nextKey.getLength() - nextKey.getPosition()
+                                         ) == 0;
+    } else {
+      nextKeyIsSame = false;
+    }
+    inputValueCounter.increment(1);
+    return true;
+  }
+
+  public Object getNextKey() throws IOException, InterruptedException {
+    return key;
+  }
+
+  public Iterable getNextValues() throws IOException,
+      InterruptedException {
+    return iterable;
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return input.getProgress().getProgress();
+  }
+
+  public void close() throws IOException {
+    input.close();
+  }
+  
+  protected class ValueIterator implements Iterator<Object> {
+
+
+    public boolean hasNext() {
+      return firstValue || nextKeyIsSame;
+    }
+
+    public Object next() {
+
+      // if this is the first record, we don't need to advance
+      if (firstValue) {
+        firstValue = false;
+        return value;
+      }
+      // if this isn't the first record and the next key is different, they
+      // can't advance it here.
+      if (!nextKeyIsSame) {
+        throw new NoSuchElementException("iterate past last value");
+      }
+      // otherwise, go to the next key/value pair
+      try {
+        nextKeyValue();
+        return value;
+      } catch (IOException ie) {
+        throw new RuntimeException("next value iterator failed", ie);
+      } catch (InterruptedException ie) {
+        // this is bad, but we can't modify the exception list of java.util
+        throw new RuntimeException("next value iterator interrupted", ie);        
+      }
+    }
+
+    public void remove() {
+      throw new UnsupportedOperationException("remove not implemented");
+    }
+  }
+
+
+  
+  protected class ValueIterable implements Iterable<Object> {
+    private ValueIterator iterator = new ValueIterator();
+    public Iterator<Object> iterator() {
+      return iterator;
+    } 
+  }
+  
+
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.common.combine;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
+import org.apache.tez.engine.records.OutputContext;
+
+public class CombineOutput implements Output {
+
+  private final Writer writer;
+  
+  public CombineOutput(Writer writer) {
+    this.writer = writer;
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    // TODO Auto-generated method stub
+
+  }
+
+  public void write(Object key, Object value) throws IOException,
+      InterruptedException {
+    writer.append(key, value);
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return null;
+  }
+  
+  public void close() throws IOException, InterruptedException {
+    writer.close();
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,127 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common.localshuffle;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.sort.impl.TezMerger;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+
+@SuppressWarnings({"rawtypes"})
+public class LocalShuffle {
+
+  private final TezTask task;
+  private final Configuration conf;
+  private final int tasksInDegree;
+
+  private final Class keyClass;
+  private final Class valClass;
+  private final RawComparator comparator;
+
+  private final FileSystem rfs;
+  private final int sortFactor;
+  
+  private final TezCounter spilledRecordsCounter;
+  private final CompressionCodec codec;
+  private final TezTaskOutput mapOutputFile;
+
+  public LocalShuffle(TezTask task, 
+      Configuration conf,
+      TezTaskReporter reporter
+      ) throws IOException {
+    this.task = task;
+    this.conf = conf;
+    this.keyClass = ConfigUtils.getMapOutputKeyClass(conf);
+    this.valClass = ConfigUtils.getMapOutputValueClass(conf);
+    this.comparator = ConfigUtils.getOutputKeyComparator(conf);
+
+    this.sortFactor =
+        conf.getInt(
+            TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
+    
+    this.rfs = FileSystem.getLocal(conf).getRaw();
+
+    this.spilledRecordsCounter = 
+        reporter.getCounter(TaskCounter.SPILLED_RECORDS);
+    
+    // compression
+    if (ConfigUtils.getCompressMapOutput(conf)) {
+      Class<? extends CompressionCodec> codecClass =
+          ConfigUtils.getMapOutputCompressorClass(conf, DefaultCodec.class);
+      this.codec = ReflectionUtils.newInstance(codecClass, conf);
+    } else {
+      this.codec = null;
+    }
+
+    this.tasksInDegree = 
+        conf.getInt(
+            TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, 
+            TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_INDEGREE);
+
+    // Always local
+    this.mapOutputFile = new TezLocalTaskOutputFiles();
+    this.mapOutputFile.setConf(conf);
+
+  }
+  
+  public TezRawKeyValueIterator run() throws IOException {
+    // Copy is complete, obviously! 
+    this.task.getProgress().addPhase("copy", 0.33f).complete();
+
+    // Merge
+    return TezMerger.merge(conf, rfs, 
+        keyClass, valClass,
+        codec, 
+        getMapFiles(),
+        false, 
+        sortFactor,
+        new Path(task.getTaskAttemptId().toString()), 
+        comparator,
+        task.getTaskReporter(), spilledRecordsCounter, null, null);
+  }
+  
+  private Path[] getMapFiles() 
+  throws IOException {
+    List<Path> fileList = new ArrayList<Path>();
+      // for local jobs
+      for(int i = 0; i < tasksInDegree; ++i) {
+        fileList.add(mapOutputFile.getInputFile(i));
+      }
+      
+    return fileList.toArray(new Path[0]);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class DelegationTokenRenewal {
+  private static final Log LOG = LogFactory.getLog(DelegationTokenRenewal.class);
+  public static final String SCHEME = "hdfs";
+  
+  /**
+   * class that is used for keeping tracks of DT to renew
+   *
+   */
+  private static class DelegationTokenToRenew {
+    public final Token<?> token;
+    public final ApplicationId jobId;
+    public final Configuration conf;
+    public long expirationDate;
+    public TimerTask timerTask;
+    
+    public DelegationTokenToRenew(
+        ApplicationId jId, Token<?> t, 
+        Configuration newConf, long newExpirationDate) {
+      token = t;
+      jobId = jId;
+      conf = newConf;
+      expirationDate = newExpirationDate;
+      timerTask = null;
+      if(token==null || jobId==null || conf==null) {
+        throw new IllegalArgumentException("invalid params for Renew Token" +
+            ";t="+token+";j="+jobId+";c="+conf);
+      }
+    }
+    public void setTimerTask(TimerTask tTask) {
+      timerTask = tTask;
+    }
+    @Override
+    public String toString() {
+      return token + ";exp="+expirationDate;
+    }
+    @Override
+    public boolean equals (Object obj) {
+      if (obj == this) {
+        return true;
+      } else if (obj == null || getClass() != obj.getClass()) {
+        return false;
+      } else {
+        return token.equals(((DelegationTokenToRenew)obj).token);
+      }
+    }
+    @Override
+    public int hashCode() {
+      return token.hashCode();
+    }
+  }
+  
+  // global single timer (daemon)
+  private static Timer renewalTimer = new Timer(true);
+  
+  //delegation token canceler thread
+  private static DelegationTokenCancelThread dtCancelThread =
+    new DelegationTokenCancelThread();
+  static {
+    dtCancelThread.start();
+  }
+
+  
+  //managing the list of tokens using Map
+  // jobId=>List<tokens>
+  private static Set<DelegationTokenToRenew> delegationTokens = 
+    Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
+  
+  private static class DelegationTokenCancelThread extends Thread {
+    private static class TokenWithConf {
+      Token<?> token;
+      Configuration conf;
+      TokenWithConf(Token<?> token, Configuration conf) {
+        this.token = token;
+        this.conf = conf;
+      }
+    }
+    private LinkedBlockingQueue<TokenWithConf> queue =  
+      new LinkedBlockingQueue<TokenWithConf>();
+     
+    public DelegationTokenCancelThread() {
+      super("Delegation Token Canceler");
+      setDaemon(true);
+    }
+    public void cancelToken(Token<?> token,  
+        Configuration conf) {
+      TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
+      while (!queue.offer(tokenWithConf)) {
+        LOG.warn("Unable to add token " + token + " for cancellation. " +
+        		 "Will retry..");
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    public void run() {
+      while (true) {
+        TokenWithConf tokenWithConf = null;
+        try {
+          tokenWithConf = queue.take();
+          final TokenWithConf current = tokenWithConf;
+          
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Canceling token " + tokenWithConf.token.getService());
+          }
+          // need to use doAs so that http can find the kerberos tgt
+          UserGroupInformation.getLoginUser().doAs(
+              new PrivilegedExceptionAction<Void>() {
+
+                @Override
+                public Void run() throws Exception {
+                  current.token.cancel(current.conf);
+                  return null;
+                }
+              });
+        } catch (IOException e) {
+          LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +  
+              StringUtils.stringifyException(e));
+        } catch (InterruptedException ie) {
+          return;
+        } catch (Throwable t) {
+          LOG.warn("Got exception " + StringUtils.stringifyException(t) + 
+                   ". Exiting..");
+          System.exit(-1);
+        }
+      }
+    }
+  }
+  //adding token
+  private static void addTokenToList(DelegationTokenToRenew t) {
+    delegationTokens.add(t);
+  }
+  
+  public static synchronized void registerDelegationTokensForRenewal(
+      ApplicationId jobId, Credentials ts, Configuration conf) throws IOException {
+    if(ts==null)
+      return; //nothing to add
+    
+    Collection <Token<?>> tokens = ts.getAllTokens();
+    long now = System.currentTimeMillis();
+
+    for (Token<?> t : tokens) {
+      // first renew happens immediately
+      if (t.isManaged()) {
+        DelegationTokenToRenew dtr = new DelegationTokenToRenew(jobId, t, conf,
+            now);
+
+        addTokenToList(dtr);
+
+        setTimerForTokenRenewal(dtr, true);
+        LOG.info("registering token for renewal for service =" + t.getService()
+            + " and jobID = " + jobId);
+      }
+    }
+  }
+    
+  /**
+   * Task - to renew a token
+   *
+   */
+  private static class RenewalTimerTask extends TimerTask {
+    private DelegationTokenToRenew dttr;
+    
+    RenewalTimerTask(DelegationTokenToRenew t) {  dttr = t;  }
+    
+    @Override
+    public void run() {
+      Token<?> token = dttr.token;
+      long newExpirationDate=0;
+      try {
+        // need to use doAs so that http can find the kerberos tgt
+        dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
+            new PrivilegedExceptionAction<Long>() {
+
+              @Override
+              public Long run() throws Exception {
+                return dttr.token.renew(dttr.conf);
+              }
+            });
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("renewing for:" + token.getService() + ";newED="
+              + dttr.expirationDate);
+        }
+        setTimerForTokenRenewal(dttr, false);// set the next one
+      } catch (Exception e) {
+        LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
+        removeFailedDelegationToken(dttr);
+      }
+    }
+  }
+  
+  /**
+   * find the soonest expiring token and set it for renew
+   */
+  private static void setTimerForTokenRenewal(
+      DelegationTokenToRenew token, boolean firstTime) {
+      
+    // calculate timer time
+    long now = System.currentTimeMillis();
+    long renewIn;
+    if(firstTime) {
+      renewIn = now;
+    } else {
+      long expiresIn = (token.expirationDate - now); 
+      renewIn = now + expiresIn - expiresIn/10; // little before expiration
+    }
+    
+    // need to create new timer every time
+    TimerTask tTask = new RenewalTimerTask(token);
+    token.setTimerTask(tTask); // keep reference to the timer
+
+    renewalTimer.schedule(token.timerTask, new Date(renewIn));
+  }
+
+  /**
+   * removing all tokens renewals
+   */
+  static public void close() {
+    renewalTimer.cancel();
+    delegationTokens.clear();
+  }
+  
+  // cancel a token
+  private static void cancelToken(DelegationTokenToRenew t) {
+    dtCancelThread.cancelToken(t.token, t.conf);
+  }
+  
+  /**
+   * removing failed DT
+   * @param jobId
+   */
+  private static void removeFailedDelegationToken(DelegationTokenToRenew t) {
+    ApplicationId jobId = t.jobId;
+    if (LOG.isDebugEnabled())
+      LOG.debug("removing failed delegation token for jobid=" + jobId + 
+          ";t=" + t.token.getService());
+    delegationTokens.remove(t);
+    // cancel the timer
+    if(t.timerTask!=null)
+      t.timerTask.cancel();
+  }
+  
+  /**
+   * removing DT for completed jobs
+   * @param jobId
+   */
+  public static void removeDelegationTokenRenewalForJob(ApplicationId jobId) {
+    synchronized (delegationTokens) {
+      Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
+      while(it.hasNext()) {
+        DelegationTokenToRenew dttr = it.next();
+        if (dttr.jobId.equals(jobId)) {
+          if (LOG.isDebugEnabled())
+            LOG.debug("removing delegation token for jobid=" + jobId + 
+                ";t=" + dttr.token.getService());
+
+          // cancel the timer
+          if(dttr.timerTask!=null)
+            dttr.timerTask.cancel();
+
+          // cancel the token
+          cancelToken(dttr);
+
+          it.remove();
+        }
+      }
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java (added)
+++ incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.security;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+
+/**
+ * The token identifier for job token
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class JobTokenIdentifier extends TokenIdentifier {
+  private Text jobid;
+  public final static Text KIND_NAME = new Text("mapreduce.job");
+  
+  /**
+   * Default constructor
+   */
+  public JobTokenIdentifier() {
+    this.jobid = new Text();
+  }
+
+  /**
+   * Create a job token identifier from a jobid
+   * @param jobid the jobid to use
+   */
+  public JobTokenIdentifier(Text jobid) {
+    this.jobid = jobid;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Text getKind() {
+    return KIND_NAME;
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public UserGroupInformation getUser() {
+    if (jobid == null || "".equals(jobid.toString())) {
+      return null;
+    }
+    return UserGroupInformation.createRemoteUser(jobid.toString());
+  }
+  
+  /**
+   * Get the jobid
+   * @return the jobid
+   */
+  public Text getJobId() {
+    return jobid;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    jobid.readFields(in);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(DataOutput out) throws IOException {
+    jobid.write(out);
+  }
+
+  @InterfaceAudience.Private
+  public static class Renewer extends Token.TrivialRenewer {
+    @Override
+    protected Text getKind() {
+      return KIND_NAME;
+    }
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-engine/src/main/java/org/apache/tez/engine/common/security/JobTokenIdentifier.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message