tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject [1/3] TEZ-799. Generate data to be used for recovery. (hitesh)
Date Fri, 07 Feb 2014 04:05:31 GMT
Updated Branches:
  refs/heads/master 38e545b18 -> 14127af12


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
new file mode 100644
index 0000000..782fff1
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.utils;
+
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.records.DAGProtos;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+public class DAGUtils {
+
+  public static JSONObject generateSimpleJSONPlan(DAGProtos.DAGPlan dagPlan) throws JSONException
{
+
+    final String DAG_NAME_KEY = "dagName";
+    final String VERTICES_KEY = "vertices";
+    final String EDGES_KEY = "edges";
+
+    final String VERTEX_NAME_KEY = "vertexName";
+    final String PROCESSOR_CLASS_KEY = "processorClass";
+    final String IN_EDGE_IDS_KEY = "inEdgeIds";
+    final String OUT_EDGE_IDS_KEY = "outEdgeIds";
+    final String ADDITIONAL_INPUTS_KEY = "additionalInputs";
+    final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs";
+    final String VERTEX_MANAGER_PLUGIN_CLASS_KEY =
+        "vertexManagerPluginClass";
+
+    final String EDGE_ID_KEY = "edgeId";
+    final String INPUT_VERTEX_NAME_KEY = "inputVertexName";
+    final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName";
+    final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType";
+    final String DATA_SOURCE_TYPE_KEY = "dataSourceType";
+    final String SCHEDULING_TYPE_KEY = "schedulingType";
+    final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass";
+    final String EDGE_DESTINATION_CLASS_KEY =
+        "edgeDestinationClass";
+
+    final String NAME_KEY = "name";
+    final String CLASS_KEY = "class";
+    final String INITIALIZER_KEY = "initializer";
+
+    JSONObject dagJson = new JSONObject();
+    dagJson.put(DAG_NAME_KEY, dagPlan.getName());
+    for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) {
+      JSONObject vertexJson = new JSONObject();
+      vertexJson.put(VERTEX_NAME_KEY, vertexPlan.getName());
+
+      if (vertexPlan.hasProcessorDescriptor()) {
+        vertexJson.put(PROCESSOR_CLASS_KEY,
+            vertexPlan.getProcessorDescriptor().getClassName());
+      }
+
+      for (String inEdgeId : vertexPlan.getInEdgeIdList()) {
+        vertexJson.accumulate(IN_EDGE_IDS_KEY, inEdgeId);
+      }
+      for (String outEdgeId : vertexPlan.getOutEdgeIdList()) {
+        vertexJson.accumulate(OUT_EDGE_IDS_KEY, outEdgeId);
+      }
+
+      for (DAGProtos.RootInputLeafOutputProto input :
+          vertexPlan.getInputsList()) {
+        JSONObject jsonInput = new JSONObject();
+        jsonInput.put(NAME_KEY, input.getName());
+        jsonInput.put(CLASS_KEY, input.getEntityDescriptor().getClassName());
+        if (input.hasInitializerClassName()) {
+          jsonInput.put(INITIALIZER_KEY, input.getInitializerClassName());
+        }
+        vertexJson.accumulate(ADDITIONAL_INPUTS_KEY, jsonInput);
+      }
+
+      for (DAGProtos.RootInputLeafOutputProto output :
+          vertexPlan.getOutputsList()) {
+        JSONObject jsonOutput = new JSONObject();
+        jsonOutput.put(NAME_KEY, output.getName());
+        jsonOutput.put(CLASS_KEY, output.getEntityDescriptor().getClassName());
+        if (output.hasInitializerClassName()) {
+          jsonOutput.put(INITIALIZER_KEY, output.getInitializerClassName());
+        }
+        vertexJson.accumulate(ADDITIONAL_OUTPUTS_KEY, jsonOutput);
+      }
+
+      if (vertexPlan.hasVertexManagerPlugin()) {
+        vertexJson.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY,
+            vertexPlan.getVertexManagerPlugin().getClassName());
+      }
+
+      dagJson.accumulate(VERTICES_KEY, vertexJson);
+    }
+
+    for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) {
+      JSONObject edgeJson = new JSONObject();
+      edgeJson.put(EDGE_ID_KEY, edgePlan.getId());
+      edgeJson.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName());
+      edgeJson.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName());
+      edgeJson.put(DATA_MOVEMENT_TYPE_KEY,
+          edgePlan.getDataMovementType().name());
+      edgeJson.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name());
+      edgeJson.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name());
+      edgeJson.put(EDGE_SOURCE_CLASS_KEY,
+          edgePlan.getEdgeSource().getClassName());
+      edgeJson.put(EDGE_DESTINATION_CLASS_KEY,
+          edgePlan.getEdgeDestination().getClassName());
+
+      dagJson.accumulate(EDGES_KEY, edgeJson);
+    }
+
+    return dagJson;
+  }
+
+  public static JSONObject convertCountersToJSON(TezCounters counters)
+      throws JSONException {
+    JSONObject jsonObject = new JSONObject();
+    if (counters == null) {
+      return jsonObject;
+    }
+
+    for (CounterGroup group : counters) {
+      JSONObject jsonCGrp = new JSONObject();
+      jsonCGrp.put(ATSConstants.COUNTER_GROUP_NAME, group.getName());
+      jsonCGrp.put(ATSConstants.COUNTER_GROUP_DISPLAY_NAME,
+          group.getDisplayName());
+      for (TezCounter counter : group) {
+        JSONObject counterJson = new JSONObject();
+        counterJson.put(ATSConstants.COUNTER_NAME, counter.getName());
+        counterJson.put(ATSConstants.COUNTER_DISPLAY_NAME,
+            counter.getDisplayName());
+        counterJson.put(ATSConstants.COUNTER_VALUE, counter.getValue());
+        jsonCGrp.accumulate(ATSConstants.COUNTERS, counterJson);
+      }
+      jsonObject.accumulate(ATSConstants.COUNTER_GROUPS, jsonCGrp);
+    }
+    return jsonObject;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
new file mode 100644
index 0000000..5b87bc8
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.recovery;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.AMLaunchedEvent;
+import org.apache.tez.dag.history.events.AMStartedEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.history.events.DAGFinishedEvent;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
+import org.apache.tez.dag.history.events.DAGSubmittedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.history.events.TaskFinishedEvent;
+import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
+import org.apache.tez.dag.history.events.VertexFinishedEvent;
+import org.apache.tez.dag.history.events.VertexInitializedEvent;
+import org.apache.tez.dag.history.events.VertexStartedEvent;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+
+import java.io.IOException;
+
+public class RecoveryParser {
+
+  private static final Log LOG = LogFactory.getLog(RecoveryParser.class);
+
+  Path recoveryDirectory;
+  FileSystem recoveryDirFS;
+
+  public RecoveryParser(Path recoveryDirectory, Configuration conf)
+      throws IOException {
+    this.recoveryDirectory = recoveryDirectory;
+    recoveryDirFS = FileSystem.get(recoveryDirectory.toUri(), conf);
+
+  }
+
+  public void parse() throws IOException {
+    RemoteIterator<LocatedFileStatus> locatedFilesStatus =
+        recoveryDirFS.listFiles(recoveryDirectory, false);
+    while (locatedFilesStatus.hasNext()) {
+      LocatedFileStatus fileStatus = locatedFilesStatus.next();
+      String fileName = fileStatus.getPath().getName();
+      if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX)) {
+        FSDataInputStream inputStream =
+            recoveryDirFS.open(fileStatus.getPath());
+        LOG.info("Parsing DAG file " + fileName);
+        parseDAGRecoveryFile(inputStream);
+      } else if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX)) {
+        FSDataInputStream inputStream =
+            recoveryDirFS.open(fileStatus.getPath());
+        LOG.info("Parsing Summary file " + fileName);
+        parseSummaryFile(inputStream);
+      } else {
+        LOG.warn("Encountered unknown file in recovery dir, fileName="
+            + fileName);
+        continue;
+      }
+    }
+  }
+
+  private void parseSummaryFile(FSDataInputStream inputStream)
+      throws IOException {
+    int counter = 0;
+    while (inputStream.available() > 0) {
+      RecoveryProtos.SummaryEventProto proto =
+          RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
+      LOG.info("[SUMMARY]"
+          + " dagId=" + proto.getDagId()
+          + ", timestamp=" + proto.getTimestamp()
+          + ", event=" + HistoryEventType.values()[proto.getEventType()]);
+    }
+  }
+
+  private void parseDAGRecoveryFile(FSDataInputStream inputStream)
+      throws IOException {
+    int counter = 0;
+    while (inputStream.available() > 0) {
+      int eventTypeOrdinal = inputStream.read();
+      if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
+          HistoryEventType.values().length) {
+        // Corrupt data
+        // reached end
+        LOG.warn("Corrupt data found when trying to read next event type");
+        break;
+      }
+      HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
+      HistoryEvent event = null;
+      switch (eventType) {
+        case AM_LAUNCHED:
+          event = new AMLaunchedEvent();
+          break;
+        case AM_STARTED:
+          event = new AMStartedEvent();
+          break;
+        case DAG_SUBMITTED:
+          event = new DAGSubmittedEvent();
+          break;
+        case DAG_INITIALIZED:
+          event = new DAGInitializedEvent();
+          break;
+        case DAG_STARTED:
+          event = new DAGStartedEvent();
+          break;
+        case DAG_FINISHED:
+          event = new DAGFinishedEvent();
+          break;
+        case CONTAINER_LAUNCHED:
+          event = new ContainerLaunchedEvent();
+          break;
+        case VERTEX_INITIALIZED:
+          event = new VertexInitializedEvent();
+          break;
+        case VERTEX_STARTED:
+          event = new VertexStartedEvent();
+          break;
+        case VERTEX_FINISHED:
+          event = new VertexFinishedEvent();
+          break;
+        case TASK_STARTED:
+          event = new TaskStartedEvent();
+          break;
+        case TASK_FINISHED:
+          event = new TaskFinishedEvent();
+          break;
+        case TASK_ATTEMPT_STARTED:
+          event = new TaskAttemptStartedEvent();
+          break;
+        case TASK_ATTEMPT_FINISHED:
+          event = new TaskAttemptFinishedEvent();
+          break;
+        case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
+          event = new VertexDataMovementEventsGeneratedEvent();
+          break;
+        default:
+          throw new IOException("Invalid data found, unknown event type "
+              + eventType);
+
+      }
+      ++counter;
+      LOG.info("Parsing event from input stream"
+          + ", eventType=" + eventType
+          + ", eventIndex=" + counter);
+      event.fromProtoStream(inputStream);
+      LOG.info("Parsed event from input stream"
+          + ", eventType=" + eventType
+          + ", eventIndex=" + counter
+          + ", event=" + event.toString());
+    }
+  }
+
+  public static void main(String argv[]) throws IOException {
+    // TODO clean up with better usage and error handling
+    Configuration conf = new Configuration();
+    String dir = argv[0];
+    RecoveryParser parser = new RecoveryParser(new Path(dir), conf);
+    parser.parse();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java
new file mode 100644
index 0000000..d17637a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.utils;
+
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+
+public class ProtoUtils {
+
+  public static RecoveryProtos.SummaryEventProto toSummaryEventProto(
+      TezDAGID dagID, long timestamp, HistoryEventType historyEventType) {
+    return RecoveryProtos.SummaryEventProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setTimestamp(timestamp)
+        .setEventType(historyEventType.ordinal()).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
new file mode 100644
index 0000000..e21f5df
--- /dev/null
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.dag.recovery.records";
+option java_outer_classname = "RecoveryProtos";
+option java_generate_equals_and_hash = true;
+
+import "DAGApiRecords.proto";
+import "Events.proto";
+
+message AMLaunchedProto {
+  optional string application_attempt_id = 1;
+  optional int64 app_submit_time = 2;
+  optional int64 launch_time = 3;
+}
+
+message AMStartedProto {
+  optional string application_attempt_id = 1;
+  optional int64 start_time = 2;
+}
+
+message ContainerLaunchedProto {
+  optional string application_attempt_id = 1;
+  optional string container_id = 2;
+  optional int64 launch_time = 3;
+}
+message DAGSubmittedProto {
+  optional string dag_id = 1;
+  optional DAGPlan dag_plan = 2;
+  optional int64 submit_time = 3;
+  optional string application_attempt_id = 4;
+}
+
+message DAGInitializedProto {
+  optional string dag_id = 1;
+  optional int64 init_time = 2;
+}
+
+message DAGStartedProto {
+  optional string dag_id = 1;
+  optional int64 start_time = 3;
+}
+
+message DAGFinishedProto {
+  optional string dag_id = 1;
+  optional int64 finish_time = 2;
+  optional int32 state = 3;
+  optional string diagnostics = 4;
+  optional TezCountersProto counters = 5;
+}
+
+message VertexInitializedProto {
+  optional string vertex_name = 1;
+  optional string vertex_id = 2;
+  optional int64 init_requested_time = 3;
+  optional int64 init_time = 4;
+  optional int64 num_tasks = 5;
+}
+
+message VertexStartedProto {
+  optional string vertex_name = 1;
+  optional string vertex_id = 2;
+  optional int64 start_requested_time = 3;
+  optional int64 start_time = 4;
+}
+
+message VertexFinishedProto {
+  optional string vertex_name = 1;
+  optional string vertex_id = 2;
+  optional int64 finish_time = 3;
+  optional int32 state = 4;
+  optional string diagnostics = 5;
+  optional TezCountersProto counters = 6;
+}
+
+message TaskStartedProto {
+  optional string task_id = 1;
+  optional int64 scheduled_time = 2;
+  optional int64 launch_time = 3;
+}
+
+message TaskFinishedProto {
+  optional string task_id = 1;
+  optional int64 finish_time = 2;
+  optional int32 state = 3;
+  optional string diagnostics = 4;
+  optional TezCountersProto counters = 5;
+}
+
+message TaskAttemptStartedProto {
+  optional string task_attempt_id = 1;
+  optional int64 start_time = 2;
+  optional string container_id = 3;
+  optional string node_id = 4;
+}
+
+message TaskAttemptFinishedProto {
+  optional string task_attempt_id = 1;
+  optional int64 finish_time = 2;
+  optional int32 state = 3;
+  optional string diagnostics = 4;
+  optional TezCountersProto counters = 5;
+}
+
+message EventMetaDataProto {
+  optional int32 producer_consumer_type = 1;
+  optional string task_vertex_name = 2;
+  optional string edge_vertex_name = 3;
+  optional string task_attempt_id = 4;
+}
+
+message TezDataMovementEventProto {
+  optional EventMetaDataProto source_info = 1;
+  optional EventMetaDataProto destination_info = 2;
+  optional DataMovementEventProto data_movement_event = 3;
+  optional CompositeEventProto composite_data_movement_event = 4;
+  optional RootInputDataInformationEventProto root_input_data_information_event = 5;
+}
+
+message VertexDataMovementEventsGeneratedProto {
+  optional string vertex_id = 1;
+  repeated TezDataMovementEventProto tez_data_movement_event = 2;
+}
+
+message SummaryEventProto {
+  optional string dag_id = 1;
+  optional int64 timestamp = 2;
+  optional int32 event_type = 3;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 4e02f59..1b66a4b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -86,8 +86,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
 import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.avro.HistoryEventType;
+import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -126,6 +125,7 @@ public class TestDAGImpl {
   private DAGPlan groupDagPlan;
   private DAGImpl groupDag;
   private TezDAGID groupDagId;
+  private HistoryEventHandler historyEventHandler;
 
   private class DagEventDispatcher implements EventHandler<DAGEvent> {
     @Override
@@ -143,12 +143,6 @@ public class TestDAGImpl {
     }
   }
 
-  private class HistoryHandler implements EventHandler<DAGHistoryEvent> {
-    @Override
-    public void handle(DAGHistoryEvent event) {
-    }
-  }
-
   private class TaskEventHandler implements EventHandler<TaskEvent> {
     @Override
     public void handle(TaskEvent event) {
@@ -595,9 +589,11 @@ public class TestDAGImpl {
     dispatcher = new DrainDispatcher();
     fsTokens = new Credentials();
     appContext = mock(AppContext.class);
+    historyEventHandler = mock(HistoryEventHandler.class);
     doReturn(conf).when(appContext).getAMConf();
     doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
     doReturn(dagId).when(appContext).getCurrentDAGID();
+    doReturn(historyEventHandler).when(appContext).getHistoryHandler();
     dag = new DAGImpl(dagId, conf, dagPlan,
         dispatcher.getEventHandler(),  taskAttemptListener,
         fsTokens, clock, "user", thh, appContext);
@@ -612,6 +608,7 @@ public class TestDAGImpl {
     doReturn(conf).when(mrrAppContext).getAMConf();
     doReturn(mrrDag).when(mrrAppContext).getCurrentDAG();
     doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
+    doReturn(historyEventHandler).when(mrrAppContext).getHistoryHandler();
     groupAppContext = mock(AppContext.class);
     groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3);
     groupDagPlan = createGroupDAGPlan();
@@ -622,14 +619,13 @@ public class TestDAGImpl {
     doReturn(conf).when(groupAppContext).getAMConf();
     doReturn(groupDag).when(groupAppContext).getCurrentDAG();
     doReturn(appAttemptId).when(groupAppContext).getApplicationAttemptId();
+    doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler();
     taskEventDispatcher = new TaskEventDispatcher();
     dispatcher.register(TaskEventType.class, taskEventDispatcher);
     vertexEventDispatcher = new VertexEventDispatcher();
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
     dagEventDispatcher = new DagEventDispatcher();
     dispatcher.register(DAGEventType.class, dagEventDispatcher);
-    dispatcher.register(HistoryEventType.class,
-        new HistoryHandler());
     dagFinishEventHandler = new DAGFinishEventHandler();
     dispatcher.register(DAGAppMasterEventType.class, dagFinishEventHandler);
     dispatcher.register(TaskEventType.class, new TaskEventHandler());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 6a668e3..c17adea 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -102,8 +102,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTermination;
 import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.avro.HistoryEventType;
+import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -156,6 +155,7 @@ public class TestVertexImpl {
   private TaskEventDispatcher taskEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
   private DagEventDispatcher dagEventDispatcher;
+  private HistoryEventHandler historyEventHandler;
 
   public static class CountingOutputCommitter extends
       OutputCommitter {
@@ -303,12 +303,6 @@ public class TestVertexImpl {
     }
   }
 
-  private class HistoryHandler implements EventHandler<DAGHistoryEvent> {
-    @Override
-    public void handle(DAGHistoryEvent event) {
-    }
-  }
-
   private class VertexEventDispatcher
       implements EventHandler<VertexEvent> {
 
@@ -1189,6 +1183,7 @@ public class TestVertexImpl {
   public void setupPostDagCreation() {
     dispatcher = new DrainDispatcher();
     appContext = mock(AppContext.class);
+    historyEventHandler = mock(HistoryEventHandler.class);
     TaskSchedulerEventHandler taskScheduler = mock(TaskSchedulerEventHandler.class);
     UserGroupInformation ugi;
     try {
@@ -1207,6 +1202,7 @@ public class TestVertexImpl {
     doReturn(dagId).when(dag).getID();
     doReturn(taskScheduler).when(appContext).getTaskScheduler();
     doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources();
+    doReturn(historyEventHandler).when(appContext).getHistoryHandler();
 
     vertexGroups = Maps.newHashMap();
     for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) {
@@ -1259,7 +1255,6 @@ public class TestVertexImpl {
     dispatcher.register(VertexEventType.class, vertexEventDispatcher);
     dagEventDispatcher = new DagEventDispatcher();
     dispatcher.register(DAGEventType.class, dagEventDispatcher);
-    dispatcher.register(HistoryEventType.class, new HistoryHandler());
     dispatcher.init(conf);
     dispatcher.start();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 7afc667..2000668 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -320,6 +320,9 @@ public class OrderedWordCount {
     boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
     long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0)
         * 1000;
+
+    boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
+
     if (((otherArgs.length%2) != 0)
         || (!useTezSession && otherArgs.length != 2)) {
       printUsage();
@@ -506,7 +509,9 @@ public class OrderedWordCount {
         }
       }
     } finally {
-      fs.delete(stagingDir, true);
+      if (!retainStagingDir) {
+        fs.delete(stagingDir, true);
+      }
       if (useTezSession) {
         tezSession.stop();
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
index 7dc5978..c0a2574 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java
@@ -150,4 +150,5 @@ public class ContainerTask implements Writable {
     }
     return sb.toString();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
new file mode 100644
index 0000000..8e17508
--- /dev/null
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import com.google.protobuf.ByteString;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.EventProtos;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+
+public class ProtoConverters {
+
+  public static EventProtos.DataMovementEventProto convertDataMovementEventToProto(
+      DataMovementEvent event) {
+    EventProtos.DataMovementEventProto.Builder builder =
+        EventProtos.DataMovementEventProto.newBuilder();
+    builder.setSourceIndex(event.getSourceIndex()).
+        setTargetIndex(event.getTargetIndex()).setVersion(event.getVersion());
+    if (event.getUserPayload() != null) {
+      builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
+    }
+    return builder.build();
+  }
+
+  public static DataMovementEvent convertDataMovementEventFromProto(
+      EventProtos.DataMovementEventProto proto) {
+    return new DataMovementEvent(proto.getSourceIndex(),
+        proto.getTargetIndex(),
+        proto.getVersion(),
+        proto.getUserPayload() != null ?
+            proto.getUserPayload().toByteArray() : null);
+  }
+
+  public static EventProtos.CompositeEventProto convertCompositeDataMovementEventToProto(
+      CompositeDataMovementEvent event) {
+    EventProtos.CompositeEventProto.Builder builder =
+        EventProtos.CompositeEventProto.newBuilder();
+    builder.setStartIndex(event.getSourceIndexStart());
+    builder.setEndIndex(event.getSourceIndexEnd());
+    if (event.getUserPayload() != null) {
+      builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
+    }
+    return builder.build();
+  }
+
+  public static CompositeDataMovementEvent convertCompositeDataMovementEventFromProto(
+      EventProtos.CompositeEventProto proto) {
+    return new CompositeDataMovementEvent(proto.getStartIndex(),
+        proto.getEndIndex(),
+        proto.hasUserPayload() ?
+            proto.getUserPayload().toByteArray() : null);
+  }
+
+  public static EventProtos.RootInputDataInformationEventProto
+      convertRootInputDataInformationEventToProto(RootInputDataInformationEvent event) {
+    EventProtos.RootInputDataInformationEventProto.Builder builder =
+        EventProtos.RootInputDataInformationEventProto.newBuilder();
+    builder.setIndex(event.getIndex());
+    if (event.getUserPayload() != null) {
+      builder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
+    }
+    return builder.build();
+  }
+
+  public static RootInputDataInformationEvent
+      convertRootInputDataInformationEventFromProto(
+      EventProtos.RootInputDataInformationEventProto proto) {
+    return new RootInputDataInformationEvent(proto.getIndex(),
+        proto.getUserPayload() != null ?
+            proto.getUserPayload().toByteArray() : null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index 51042c2..ecadeb5 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -23,6 +23,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
+import org.apache.tez.common.ProtoConverters;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
@@ -124,24 +125,14 @@ public class TezEvent implements Writable {
       byte[] eventBytes = null;
       switch (eventType) {
       case DATA_MOVEMENT_EVENT:
-        DataMovementEvent dmEvt = (DataMovementEvent) event;
-        DataMovementEventProto.Builder dmBuilder = DataMovementEventProto.newBuilder();
-        dmBuilder.setSourceIndex(dmEvt.getSourceIndex()).
-        setTargetIndex(dmEvt.getTargetIndex()).setVersion(dmEvt.getVersion());
-        if (dmEvt.getUserPayload() != null) {
-          dmBuilder.setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()));
-        }
-        eventBytes = dmBuilder.build().toByteArray();
+        eventBytes =
+            ProtoConverters.convertDataMovementEventToProto(
+                (DataMovementEvent) event).toByteArray();
         break;
       case COMPOSITE_DATA_MOVEMENT_EVENT:
-        CompositeDataMovementEvent cEvent = (CompositeDataMovementEvent) event;
-        CompositeEventProto.Builder cBuilder = CompositeEventProto.newBuilder();
-        cBuilder.setStartIndex(cEvent.getSourceIndexStart());
-        cBuilder.setEndIndex(cEvent.getSourceIndexEnd());
-        if (cEvent.getUserPayload() != null) {
-          cBuilder.setUserPayload(ByteString.copyFrom(cEvent.getUserPayload()));
-        }
-        eventBytes = cBuilder.build().toByteArray();
+        eventBytes =
+            ProtoConverters.convertCompositeDataMovementEventToProto(
+                (CompositeDataMovementEvent) event).toByteArray();
         break;
       case VERTEX_MANAGER_EVENT:
         VertexManagerEvent vmEvt = (VertexManagerEvent) event;
@@ -178,14 +169,8 @@ public class TezEvent implements Writable {
             .setVersion(ifEvt.getVersion()).build().toByteArray();
         break;
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
-        RootInputDataInformationEvent liEvent = (RootInputDataInformationEvent) event;
-        RootInputDataInformationEventProto.Builder riBuilder =
-            RootInputDataInformationEventProto.newBuilder();
-        riBuilder.setIndex(liEvent.getIndex());
-        if (liEvent.getUserPayload() != null) {
-          riBuilder.setUserPayload(ByteString.copyFrom(liEvent.getUserPayload()));
-        }
-        eventBytes = riBuilder.build().toByteArray();
+        eventBytes = ProtoConverters.convertRootInputDataInformationEventToProto(
+            (RootInputDataInformationEvent) event).toByteArray();
         break;
       default:
         throw new TezUncheckedException("Unknown TezEvent"
@@ -214,15 +199,11 @@ public class TezEvent implements Writable {
       case DATA_MOVEMENT_EVENT:
         DataMovementEventProto dmProto =
             DataMovementEventProto.parseFrom(eventBytes);
-        event = new DataMovementEvent(dmProto.getSourceIndex(),
-            dmProto.getTargetIndex(),
-            dmProto.getVersion(),
-            dmProto.getUserPayload() != null ? dmProto.getUserPayload().toByteArray() : null);
+        event = ProtoConverters.convertDataMovementEventFromProto(dmProto);
         break;
       case COMPOSITE_DATA_MOVEMENT_EVENT:
         CompositeEventProto cProto = CompositeEventProto.parseFrom(eventBytes);
-        event = new CompositeDataMovementEvent(cProto.getStartIndex(), cProto.getEndIndex(),
-            cProto.hasUserPayload() ? cProto.getUserPayload().toByteArray() : null);
+        event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto);
         break;
       case VERTEX_MANAGER_EVENT:
         VertexManagerEventProto vmProto =
@@ -253,8 +234,7 @@ public class TezEvent implements Writable {
       case ROOT_INPUT_DATA_INFORMATION_EVENT:
         RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto
             .parseFrom(eventBytes);
-        event = new RootInputDataInformationEvent(difProto.getIndex(), 
-            difProto.getUserPayload() != null ? difProto.getUserPayload().toByteArray() :
null);
+        event = ProtoConverters.convertRootInputDataInformationEventFromProto(difProto);
         break;
       default:
         // RootInputUpdatePayload event not wrapped in a TezEvent.


Mime
View raw message