tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-436. Send regular progress updates from tez task to AM. (hitesh)
Date Fri, 13 Sep 2013 00:41:22 GMT
Updated Branches:
  refs/heads/TEZ-398 b8a66679a -> d59d6620c


TEZ-436. Send regular progress updates from tez task to AM. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d59d6620
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d59d6620
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d59d6620

Branch: refs/heads/TEZ-398
Commit: d59d6620c1121d421ad16cf7df9490c25371c6e6
Parents: b8a6667
Author: Hitesh Shah <hitesh@apache.org>
Authored: Thu Sep 12 17:40:57 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Thu Sep 12 17:40:57 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  16 ++
 .../org/apache/tez/engine/newapi/Input.java     |  25 ++-
 .../org/apache/tez/engine/newapi/Output.java    |  23 ++-
 .../org/apache/tez/engine/newapi/Processor.java |  12 +-
 .../tez/engine/newapi/TezProcessorContext.java  |   9 +
 .../newapi/events/TaskStatusUpdateEvent.java    |  70 ++++++++
 .../tez/engine/newapi/impl/EventType.java       |   3 +-
 .../apache/tez/engine/newapi/impl/TezEvent.java | 180 ++++++++++---------
 .../newapi/impl/TezProcessorContextImpl.java    |   9 +-
 .../engine/newapi/impl/TezTaskContextImpl.java  |   2 +-
 .../tez/engine/newapi/impl/TezUmbilical.java    |   3 +
 .../LogicalIOProcessorRuntimeTask.java          |  20 +--
 .../tez/engine/newruntime/RuntimeTask.java      |  16 ++
 .../tez/mapreduce/newoutput/SimpleOutput.java   |   1 -
 14 files changed, 259 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 66227b0..10170eb 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -75,6 +75,7 @@ import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
 import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
+import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
 import org.apache.tez.engine.newapi.impl.EventMetaData;
 import org.apache.tez.engine.newapi.impl.InputSpec;
 import org.apache.tez.engine.newapi.impl.OutputSpec;
@@ -218,15 +219,24 @@ public class YarnTezDagChild {
   }
 
   private static void heartbeat() throws TezException, IOException {
+    TezEvent updateEvent = null;
     try {
       taskLock.readLock().lock();
       if (currentTask == null) {
         return;
+      } else {
+        updateEvent = new TezEvent(new TaskStatusUpdateEvent(
+            currentTask.getCounters(), currentTask.getProgress()),
+            new EventMetaData(EventProducerConsumerType.SYSTEM,
+                "", "", currentTaskAttemptID));
       }
     } finally {
       taskLock.readLock().unlock();
     }
     List<TezEvent> events = new ArrayList<TezEvent>();
+    if (updateEvent != null) {
+      events.add(updateEvent);
+    }
     eventsToSend.drainTo(events);
     long reqId = requestCounter.incrementAndGet();
     TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
@@ -333,6 +343,12 @@ public class YarnTezDagChild {
           // TODONEWTEZ System.exit ?
         }
       }
+
+      @Override
+      public boolean canCommit(TezTaskAttemptID taskAttemptID)
+          throws IOException {
+        return umbilical.canCommit(taskAttemptID);
+      }
     };
 
     // report non-pid to application master

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Input.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Input.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Input.java
index 9552d4d..5608a85 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Input.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Input.java
@@ -18,13 +18,12 @@
 
 package org.apache.tez.engine.newapi;
 
-import java.io.IOException;
 import java.util.List;
 
 /**
  * Represents an input through which a TezProcessor receives data on an edge.
  * </p>
- * 
+ *
  * <code>Input</code> classes must have a 0 argument public constructor for Tez
  * to construct the <code>Input</code>. Tez will take care of initializing and
  * closing the Input after a {@link Processor} completes. </p>
@@ -33,29 +32,29 @@ public interface Input {
 
   /**
    * Initializes the <code>Input</code>.
-   * 
+   *
    * @param inputContext
    *          the {@link TezInputContext}
    * @return
-   * @throws IOException
+   * @throws Exception
    *           if an error occurs
    */
   public List<Event> initialize(TezInputContext inputContext)
-      throws IOException;
+      throws Exception;
 
   /**
    * Gets an instance of the {@link Reader} for this <code>Output</code>
-   * 
+   *
    * @return
-   * @throws IOException
+   * @throws Exception
    *           if an error occurs
    */
-  public Reader getReader() throws IOException;
+  public Reader getReader() throws Exception;
 
   /**
    * Handles user and system generated {@link Events}s, which typically carry
    * information such as an output being available on the previous vertex.
-   * 
+   *
    * @param inputEvents
    *          the list of {@link Event}s
    */
@@ -63,10 +62,10 @@ public interface Input {
 
   /**
    * Closes the <code>Input</code>
-   * 
+   *
    * @return
-   * @throws IOException
+   * @throws Exception
    *           if an error occurs
    */
-  public List<Event> close() throws IOException;
-}
\ No newline at end of file
+  public List<Event> close() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Output.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Output.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Output.java
index 4c036e6..5a6b5da 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Output.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Output.java
@@ -18,13 +18,12 @@
 
 package org.apache.tez.engine.newapi;
 
-import java.io.IOException;
 import java.util.List;
 
 /**
  * Represents an Output through which a TezProcessor writes information on an
  * edge. </p>
- * 
+ *
  * <code>Output</code> implementations must have a 0 argument public constructor
  * for Tez to construct the <code>Output</code>. Tez will take care of
  * initializing and closing the Input after a {@link Processor} completes. </p>
@@ -33,29 +32,29 @@ public interface Output {
 
   /**
    * Initializes the <code>Output</code>
-   * 
+   *
    * @param outputContext
    *          the {@link TezOutputContext}
    * @return
-   * @throws IOException
+   * @throws Exception
    *           if an error occurs
    */
   public List<Event> initialize(TezOutputContext outputContext)
-      throws IOException;
+      throws Exception;
 
   /**
    * Gets an instance of the {@link Writer} in an <code>Output</code>
-   * 
+   *
    * @return
-   * @throws IOException
+   * @throws Exception
    *           if an error occurs
    */
-  public Writer getWriter() throws IOException;
+  public Writer getWriter() throws Exception;
 
   /**
    * Handles user and system generated {@link Events}s, which typically carry
    * information such as a downstream vertex being ready to consume input.
-   * 
+   *
    * @param outputEvents
    *          the list of {@link Event}s
    */
@@ -63,10 +62,10 @@ public interface Output {
 
   /**
    * Closes the <code>Output</code>
-   * 
+   *
    * @return
-   * @throws IOException
+   * @throws Exception
    *           if an error occurs
    */
-  public List<Event> close() throws IOException;
+  public List<Event> close() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Processor.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Processor.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Processor.java
index 3135cf1..3e71b58 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Processor.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/Processor.java
@@ -32,17 +32,17 @@ public interface Processor {
 
   /**
    * Initializes the <code>Processor</code>
-   * 
+   *
    * @param processorContext
    * @throws IOException
    *           if an error occurs
    */
   public void initialize(TezProcessorContext processorContext)
-      throws IOException;
+      throws Exception;
 
   /**
    * Handles user and system generated {@link Events}s.
-   * 
+   *
    * @param processorEvents
    *          the list of {@link Event}s
    */
@@ -50,9 +50,9 @@ public interface Processor {
 
   /**
    * Closes the <code>Processor</code>
-   * 
+   *
    * @throws IOException
    *           if an error occurs
    */
-  public void close() throws IOException;
-}
\ No newline at end of file
+  public void close() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java
b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java
index dbf387a..5b44f23 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezProcessorContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.engine.newapi;
 
+import java.io.IOException;
+
 /**
  * Context handle for the Processor to initialize itself.
  */
@@ -29,4 +31,11 @@ public interface TezProcessorContext extends TezTaskContext {
    */
   public void setProgress(float progress);
 
+  /**
+   * Check whether this attempt can commit its output
+   * @return true if commit allowed
+   * @throws IOException
+   */
+  public boolean canCommit() throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java
new file mode 100644
index 0000000..0f09867
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskStatusUpdateEvent.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskStatusUpdateEvent extends Event implements Writable {
+
+  private TezCounters tezCounters;
+  private float progress;
+
+  public TaskStatusUpdateEvent() {
+  }
+
+  public TaskStatusUpdateEvent(TezCounters tezCounters, float progress) {
+    this.tezCounters = tezCounters;
+    this.progress = progress;
+  }
+
+  public TezCounters getCounters() {
+    return tezCounters;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeFloat(progress);
+    if (tezCounters != null) {
+      out.writeBoolean(true);
+      tezCounters.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    progress = in.readFloat();
+    if (in.readBoolean()) {
+      tezCounters = new TezCounters();
+      tezCounters.readFields(in);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
index 51a1b24..87d6665 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
@@ -24,5 +24,6 @@ public enum EventType {
   DATA_MOVEMENT_EVENT,
   INPUT_READ_ERROR_EVENT,
   INPUT_FAILED_EVENT,
-  INTPUT_INFORMATION_EVENT
+  INTPUT_INFORMATION_EVENT,
+  TASK_STATUS_UPDATE_EVENT
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
index fdb8754..02e6112 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
@@ -37,6 +37,7 @@ import org.apache.tez.engine.newapi.events.InputInformationEvent;
 import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
 import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
 import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
+import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
 
 import com.google.protobuf.ByteString;
 
@@ -68,6 +69,8 @@ public class TezEvent implements Writable {
       eventType = EventType.INTPUT_INFORMATION_EVENT;
     } else if (event instanceof InputFailedEvent) {
       eventType = EventType.INPUT_FAILED_EVENT;
+    } else if (event instanceof TaskStatusUpdateEvent) {
+      eventType = EventType.TASK_STATUS_UPDATE_EVENT;
     } else {
       throw new TezUncheckedException("Unknown event, event="
           + event.getClass().getName());
@@ -104,51 +107,57 @@ public class TezEvent implements Writable {
       return;
     }
     out.writeBoolean(true);
-    byte[] eventBytes = null;
-    switch (eventType) {
-    case DATA_MOVEMENT_EVENT:
-      DataMovementEvent dmEvt = (DataMovementEvent) event;
-      eventBytes = DataMovementEventProto.newBuilder()
-        .setSourceIndex(dmEvt.getSourceIndex())
-        .setTargetIndex(dmEvt.getTargetIndex())
-        .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
-        .build().toByteArray();
-      break;
-    case INPUT_READ_ERROR_EVENT:
-      InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
-      eventBytes = InputReadErrorEventProto.newBuilder()
-          .setIndex(ideEvt.getIndex())
-          .setDiagnostics(ideEvt.getDiagnostics())
-          .build().toByteArray();
-      break;
-    case TASK_ATTEMPT_FAILED_EVENT:
-      TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
-      eventBytes = TaskAttemptFailedEventProto.newBuilder()
-          .setDiagnostics(tfEvt.getDiagnostics())
-          .build().toByteArray();
-      break;
-    case TASK_ATTEMPT_COMPLETED_EVENT:
-      eventBytes = TaskAttemptCompletedEventProto.newBuilder()
-          .build().toByteArray();
-      break;
-    case INPUT_FAILED_EVENT:
-      InputFailedEvent ifEvt = (InputFailedEvent) event;
-      eventBytes = InputFailedEventProto.newBuilder()
-          .setSourceIndex(ifEvt.getSourceIndex())
-          .setTargetIndex(ifEvt.getTargetIndex())
-          .setVersion(ifEvt.getVersion()).build().toByteArray();
-    case INTPUT_INFORMATION_EVENT:
-      InputInformationEvent iEvt = (InputInformationEvent) event;
-      eventBytes = InputInformationEventProto.newBuilder()
-          .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
+    if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
+      // TODO NEWTEZ convert to PB
+      TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
+      sEvt.write(out);
+    } else {
+      byte[] eventBytes = null;
+      switch (eventType) {
+      case DATA_MOVEMENT_EVENT:
+        DataMovementEvent dmEvt = (DataMovementEvent) event;
+        eventBytes = DataMovementEventProto.newBuilder()
+          .setSourceIndex(dmEvt.getSourceIndex())
+          .setTargetIndex(dmEvt.getTargetIndex())
+          .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
           .build().toByteArray();
-    default:
-      throw new TezUncheckedException("Unknown TezEvent"
-         + ", type=" + eventType);
+        break;
+      case INPUT_READ_ERROR_EVENT:
+        InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
+        eventBytes = InputReadErrorEventProto.newBuilder()
+            .setIndex(ideEvt.getIndex())
+            .setDiagnostics(ideEvt.getDiagnostics())
+            .build().toByteArray();
+        break;
+      case TASK_ATTEMPT_FAILED_EVENT:
+        TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
+        eventBytes = TaskAttemptFailedEventProto.newBuilder()
+            .setDiagnostics(tfEvt.getDiagnostics())
+            .build().toByteArray();
+        break;
+      case TASK_ATTEMPT_COMPLETED_EVENT:
+        eventBytes = TaskAttemptCompletedEventProto.newBuilder()
+            .build().toByteArray();
+        break;
+      case INPUT_FAILED_EVENT:
+        InputFailedEvent ifEvt = (InputFailedEvent) event;
+        eventBytes = InputFailedEventProto.newBuilder()
+            .setSourceIndex(ifEvt.getSourceIndex())
+            .setTargetIndex(ifEvt.getTargetIndex())
+            .setVersion(ifEvt.getVersion()).build().toByteArray();
+      case INTPUT_INFORMATION_EVENT:
+        InputInformationEvent iEvt = (InputInformationEvent) event;
+        eventBytes = InputInformationEventProto.newBuilder()
+            .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
+            .build().toByteArray();
+      default:
+        throw new TezUncheckedException("Unknown TezEvent"
+           + ", type=" + eventType);
+      }
+      out.writeInt(eventType.ordinal());
+      out.writeInt(eventBytes.length);
+      out.write(eventBytes);
     }
-    out.writeInt(eventType.ordinal());
-    out.writeInt(eventBytes.length);
-    out.write(eventBytes);
   }
 
   private void deserializeEvent(DataInput in) throws IOException {
@@ -157,45 +166,52 @@ public class TezEvent implements Writable {
       return;
     }
     eventType = EventType.values()[in.readInt()];
-    int eventBytesLen = in.readInt();
-    byte[] eventBytes = new byte[eventBytesLen];
-    in.readFully(eventBytes);
-    switch (eventType) {
-    case DATA_MOVEMENT_EVENT:
-      DataMovementEventProto dmProto = DataMovementEventProto.parseFrom(eventBytes);
-      event = new DataMovementEvent(dmProto.getSourceIndex(),
-          dmProto.getTargetIndex(),
-          dmProto.getUserPayload().toByteArray());
-      break;
-    case INPUT_READ_ERROR_EVENT:
-      InputReadErrorEventProto ideProto =
-          InputReadErrorEventProto.parseFrom(eventBytes);
-      event = new InputReadErrorEvent(ideProto.getDiagnostics(),
-          ideProto.getIndex(), ideProto.getVersion());
-      break;
-    case TASK_ATTEMPT_FAILED_EVENT:
-      TaskAttemptFailedEventProto tfProto =
-          TaskAttemptFailedEventProto.parseFrom(eventBytes);
-      event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
-      break;
-    case TASK_ATTEMPT_COMPLETED_EVENT:
-      event = new TaskAttemptCompletedEvent();
-      break;
-    case INPUT_FAILED_EVENT:
-      InputFailedEventProto ifProto =
-          InputFailedEventProto.parseFrom(eventBytes);
-      event = new InputFailedEvent(ifProto.getSourceIndex(),
-          ifProto.getTargetIndex(), ifProto.getVersion());
-      break;
-    case INTPUT_INFORMATION_EVENT:
-      InputInformationEventProto infoProto =
-          InputInformationEventProto.parseFrom(eventBytes);
-      event = new InputInformationEvent(
-          infoProto.getUserPayload().toByteArray());
-      break;
-    default:
-      throw new TezUncheckedException("Unknown TezEvent"
-         + ", type=" + eventType);
+    if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
+      // TODO NEWTEZ convert to PB
+      event = new TaskStatusUpdateEvent();
+      ((TaskStatusUpdateEvent)event).readFields(in);
+    } else {
+      int eventBytesLen = in.readInt();
+      byte[] eventBytes = new byte[eventBytesLen];
+      in.readFully(eventBytes);
+      switch (eventType) {
+      case DATA_MOVEMENT_EVENT:
+        DataMovementEventProto dmProto =
+            DataMovementEventProto.parseFrom(eventBytes);
+        event = new DataMovementEvent(dmProto.getSourceIndex(),
+            dmProto.getTargetIndex(),
+            dmProto.getUserPayload().toByteArray());
+        break;
+      case INPUT_READ_ERROR_EVENT:
+        InputReadErrorEventProto ideProto =
+            InputReadErrorEventProto.parseFrom(eventBytes);
+        event = new InputReadErrorEvent(ideProto.getDiagnostics(),
+            ideProto.getIndex(), ideProto.getVersion());
+        break;
+      case TASK_ATTEMPT_FAILED_EVENT:
+        TaskAttemptFailedEventProto tfProto =
+            TaskAttemptFailedEventProto.parseFrom(eventBytes);
+        event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
+        break;
+      case TASK_ATTEMPT_COMPLETED_EVENT:
+        event = new TaskAttemptCompletedEvent();
+        break;
+      case INPUT_FAILED_EVENT:
+        InputFailedEventProto ifProto =
+            InputFailedEventProto.parseFrom(eventBytes);
+        event = new InputFailedEvent(ifProto.getSourceIndex(),
+            ifProto.getTargetIndex(), ifProto.getVersion());
+        break;
+      case INTPUT_INFORMATION_EVENT:
+        InputInformationEventProto infoProto =
+            InputInformationEventProto.parseFrom(eventBytes);
+        event = new InputInformationEvent(
+            infoProto.getUserPayload().toByteArray());
+        break;
+      default:
+        throw new TezUncheckedException("Unknown TezEvent"
+           + ", type=" + eventType);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index da17468..f2f830c 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -19,6 +19,7 @@
 package org.apache.tez.engine.newapi.impl;
 
 import java.nio.ByteBuffer;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -69,8 +70,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
 
   @Override
   public void setProgress(float progress) {
-    // TODO Auto-generated method stub
-
+    runtimeTask.setProgress(progress);
   }
 
   @Override
@@ -78,4 +78,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
     super.signalFatalError(exception, message, sourceInfo);
   }
 
+  @Override
+  public boolean canCommit() throws IOException {
+    return tezUmbilical.canCommit(this.taskAttemptID);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index 7a592ae..d8e68a3 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -37,7 +37,7 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
 
   private final Configuration conf;
   protected final String taskVertexName;
-  private final TezTaskAttemptID taskAttemptID;
+  protected final TezTaskAttemptID taskAttemptID;
   private final TezCounters counters;
   private String[] workDirs;
   protected String uniqueIdentifier;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
index b88dc63..5889622 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -30,4 +31,6 @@ public interface TezUmbilical {
       String diagnostics,
       EventMetaData sourceInfo);
 
+  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index b03d3e1..390b0a7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -32,7 +32,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.Token;
-import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
@@ -77,8 +76,6 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   private final ProcessorDescriptor processorDescriptor;
   private final LogicalIOProcessor processor;
 
-  private final TezCounters tezCounters;
-  
   private final Map<String, ByteBuffer> serviceConsumerMetadata;
 
   private Map<String, LogicalInput> inputMap;
@@ -89,8 +86,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
 
   private Map<String, List<Event>> closeInputEventMap;
   private Map<String, List<Event>> closeOutputEventMap;
-  
-  
+
+
 
   public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec,
       Configuration tezConf, TezUmbilical tezUmbilical,
@@ -107,14 +104,13 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     this.outputs = createOutputs(outputSpecs);
     this.processorDescriptor = taskSpec.getProcessorDescriptor();
     this.processor = createProcessor(processorDescriptor);
-    this.tezCounters = new TezCounters();
     this.serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
     this.serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
         ShuffleUtils.convertJobTokenToBytes(jobToken));
     this.state = State.NEW;
   }
 
-  public void initialize() throws IOException {
+  public void initialize() throws Exception {
     Preconditions.checkState(this.state == State.NEW, "Already initialized");
     this.state = State.INITED;
     inputMap = new LinkedHashMap<String, LogicalInput>(inputs.size());
@@ -159,7 +155,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     return initOutputEventMap;
   }
 
-  public void run() throws IOException {
+  public void run() throws Exception {
     synchronized (this.state) {
       Preconditions.checkState(this.state == State.INITED,
           "Can only run while in INITED state. Current: " + this.state);
@@ -169,7 +165,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     lioProcessor.run(inputMap, outputMap);
   }
 
-  public void close() throws IOException {
+  public void close() throws Exception {
     Preconditions.checkState(this.state == State.RUNNING,
         "Can only run while in RUNNING state. Current: " + this.state);
     this.state=State.CLOSED;
@@ -205,7 +201,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private List<Event> initializeInput(Input input, InputSpec inputSpec)
-      throws IOException {
+      throws Exception {
     TezInputContext tezInputContext = createInputContext(inputSpec);
     if (input instanceof LogicalInput) {
       ((LogicalInput) input).setNumPhysicalInputs(inputSpec
@@ -215,7 +211,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   }
 
   private List<Event> initializeOutput(Output output, OutputSpec outputSpec)
-      throws IOException {
+      throws Exception {
     TezOutputContext tezOutputContext = createOutputContext(outputSpec);
     if (output instanceof LogicalOutput) {
       ((LogicalOutput) output).setNumPhysicalOutputs(outputSpec
@@ -224,7 +220,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     return output.initialize(tezOutputContext);
   }
 
-  private void initializeLogicalIOProcessor() throws IOException {
+  private void initializeLogicalIOProcessor() throws Exception {
     TezProcessorContext processorContext = createProcessorContext();
     processor.initialize(processorContext);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
index 479f917..045d1c6 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/RuntimeTask.java
@@ -20,11 +20,15 @@ package org.apache.tez.engine.newruntime;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.tez.common.counters.TezCounters;
+
 public abstract class RuntimeTask {
 
   protected AtomicBoolean hasFatalError = new AtomicBoolean(false);
   protected Throwable fatalError = null;
   protected String fatalErrorMessage = null;
+  protected float progress;
+  protected final TezCounters tezCounters = new TezCounters();
 
   protected enum State {
     NEW, INITED, RUNNING, CLOSED;
@@ -42,4 +46,16 @@ public abstract class RuntimeTask {
     return hasFatalError.get();
   }
 
+  public synchronized void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  public synchronized float getProgress() {
+    return this.progress;
+  }
+
+  public TezCounters getCounters() {
+    return this.tezCounters;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d59d6620/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
index 6634429..0d774d2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
@@ -16,7 +16,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.KVWriter;
 import org.apache.tez.engine.newapi.LogicalOutput;


Mime
View raw message