Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 11BDE1049F for ; Fri, 7 Feb 2014 04:06:11 +0000 (UTC) Received: (qmail 79185 invoked by uid 500); 7 Feb 2014 04:06:10 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 79134 invoked by uid 500); 7 Feb 2014 04:06:07 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 79126 invoked by uid 99); 7 Feb 2014 04:06:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Feb 2014 04:06:05 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 07 Feb 2014 04:05:56 +0000 Received: (qmail 78589 invoked by uid 99); 7 Feb 2014 04:05:32 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Feb 2014 04:05:32 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3D0B891E3F6; Fri, 7 Feb 2014 04:05:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.incubator.apache.org Date: Fri, 07 Feb 2014 04:05:31 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] TEZ-799. Generate data to be used for recovery. (hitesh) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master 38e545b18 -> 14127af12 http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java new file mode 100644 index 0000000..782fff1 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.history.utils; + +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.records.DAGProtos; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +public class DAGUtils { + + public static JSONObject generateSimpleJSONPlan(DAGProtos.DAGPlan dagPlan) throws JSONException { + + final String DAG_NAME_KEY = "dagName"; + final String VERTICES_KEY = "vertices"; + final String EDGES_KEY = "edges"; + + final String VERTEX_NAME_KEY = "vertexName"; + final String PROCESSOR_CLASS_KEY = "processorClass"; + final String IN_EDGE_IDS_KEY = "inEdgeIds"; + final String OUT_EDGE_IDS_KEY = "outEdgeIds"; + final String ADDITIONAL_INPUTS_KEY = "additionalInputs"; + final String ADDITIONAL_OUTPUTS_KEY = "additionalOutputs"; + final String VERTEX_MANAGER_PLUGIN_CLASS_KEY = + "vertexManagerPluginClass"; + + final String EDGE_ID_KEY = "edgeId"; + final String INPUT_VERTEX_NAME_KEY = "inputVertexName"; + final String OUTPUT_VERTEX_NAME_KEY = "outputVertexName"; + final String DATA_MOVEMENT_TYPE_KEY = "dataMovementType"; + final String DATA_SOURCE_TYPE_KEY = "dataSourceType"; + final String SCHEDULING_TYPE_KEY = "schedulingType"; + final String EDGE_SOURCE_CLASS_KEY = "edgeSourceClass"; + final String EDGE_DESTINATION_CLASS_KEY = + "edgeDestinationClass"; + + final String NAME_KEY = "name"; + final String CLASS_KEY = "class"; + final String INITIALIZER_KEY = "initializer"; + + JSONObject dagJson = new JSONObject(); + dagJson.put(DAG_NAME_KEY, dagPlan.getName()); + for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) { + JSONObject vertexJson = new JSONObject(); + vertexJson.put(VERTEX_NAME_KEY, vertexPlan.getName()); + + if (vertexPlan.hasProcessorDescriptor()) { + vertexJson.put(PROCESSOR_CLASS_KEY, + vertexPlan.getProcessorDescriptor().getClassName()); + } + + for (String inEdgeId : vertexPlan.getInEdgeIdList()) { + vertexJson.accumulate(IN_EDGE_IDS_KEY, inEdgeId); + } + for (String outEdgeId : vertexPlan.getOutEdgeIdList()) { + vertexJson.accumulate(OUT_EDGE_IDS_KEY, outEdgeId); + } + + for (DAGProtos.RootInputLeafOutputProto input : + vertexPlan.getInputsList()) { + JSONObject jsonInput = new JSONObject(); + jsonInput.put(NAME_KEY, input.getName()); + jsonInput.put(CLASS_KEY, input.getEntityDescriptor().getClassName()); + if (input.hasInitializerClassName()) { + jsonInput.put(INITIALIZER_KEY, input.getInitializerClassName()); + } + vertexJson.accumulate(ADDITIONAL_INPUTS_KEY, jsonInput); + } + + for (DAGProtos.RootInputLeafOutputProto output : + vertexPlan.getOutputsList()) { + JSONObject jsonOutput = new JSONObject(); + jsonOutput.put(NAME_KEY, output.getName()); + jsonOutput.put(CLASS_KEY, output.getEntityDescriptor().getClassName()); + if (output.hasInitializerClassName()) { + jsonOutput.put(INITIALIZER_KEY, output.getInitializerClassName()); + } + vertexJson.accumulate(ADDITIONAL_OUTPUTS_KEY, jsonOutput); + } + + if (vertexPlan.hasVertexManagerPlugin()) { + vertexJson.put(VERTEX_MANAGER_PLUGIN_CLASS_KEY, + vertexPlan.getVertexManagerPlugin().getClassName()); + } + + dagJson.accumulate(VERTICES_KEY, vertexJson); + } + + for (DAGProtos.EdgePlan edgePlan : dagPlan.getEdgeList()) { + JSONObject edgeJson = new JSONObject(); + edgeJson.put(EDGE_ID_KEY, edgePlan.getId()); + edgeJson.put(INPUT_VERTEX_NAME_KEY, edgePlan.getInputVertexName()); + edgeJson.put(OUTPUT_VERTEX_NAME_KEY, edgePlan.getOutputVertexName()); + edgeJson.put(DATA_MOVEMENT_TYPE_KEY, + edgePlan.getDataMovementType().name()); + edgeJson.put(DATA_SOURCE_TYPE_KEY, edgePlan.getDataSourceType().name()); + edgeJson.put(SCHEDULING_TYPE_KEY, edgePlan.getSchedulingType().name()); + edgeJson.put(EDGE_SOURCE_CLASS_KEY, + edgePlan.getEdgeSource().getClassName()); + edgeJson.put(EDGE_DESTINATION_CLASS_KEY, + edgePlan.getEdgeDestination().getClassName()); + + dagJson.accumulate(EDGES_KEY, edgeJson); + } + + return dagJson; + } + + public static JSONObject convertCountersToJSON(TezCounters counters) + throws JSONException { + JSONObject jsonObject = new JSONObject(); + if (counters == null) { + return jsonObject; + } + + for (CounterGroup group : counters) { + JSONObject jsonCGrp = new JSONObject(); + jsonCGrp.put(ATSConstants.COUNTER_GROUP_NAME, group.getName()); + jsonCGrp.put(ATSConstants.COUNTER_GROUP_DISPLAY_NAME, + group.getDisplayName()); + for (TezCounter counter : group) { + JSONObject counterJson = new JSONObject(); + counterJson.put(ATSConstants.COUNTER_NAME, counter.getName()); + counterJson.put(ATSConstants.COUNTER_DISPLAY_NAME, + counter.getDisplayName()); + counterJson.put(ATSConstants.COUNTER_VALUE, counter.getValue()); + jsonCGrp.accumulate(ATSConstants.COUNTERS, counterJson); + } + jsonObject.accumulate(ATSConstants.COUNTER_GROUPS, jsonCGrp); + } + return jsonObject; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java new file mode 100644 index 0000000..5b87bc8 --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.recovery; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.history.events.AMLaunchedEvent; +import org.apache.tez.dag.history.events.AMStartedEvent; +import org.apache.tez.dag.history.events.ContainerLaunchedEvent; +import org.apache.tez.dag.history.events.DAGFinishedEvent; +import org.apache.tez.dag.history.events.DAGInitializedEvent; +import org.apache.tez.dag.history.events.DAGStartedEvent; +import org.apache.tez.dag.history.events.DAGSubmittedEvent; +import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; +import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; +import org.apache.tez.dag.history.events.TaskFinishedEvent; +import org.apache.tez.dag.history.events.TaskStartedEvent; +import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent; +import org.apache.tez.dag.history.events.VertexFinishedEvent; +import org.apache.tez.dag.history.events.VertexInitializedEvent; +import org.apache.tez.dag.history.events.VertexStartedEvent; +import org.apache.tez.dag.recovery.records.RecoveryProtos; + +import java.io.IOException; + +public class RecoveryParser { + + private static final Log LOG = LogFactory.getLog(RecoveryParser.class); + + Path recoveryDirectory; + FileSystem recoveryDirFS; + + public RecoveryParser(Path recoveryDirectory, Configuration conf) + throws IOException { + this.recoveryDirectory = recoveryDirectory; + recoveryDirFS = FileSystem.get(recoveryDirectory.toUri(), conf); + + } + + public void parse() throws IOException { + RemoteIterator locatedFilesStatus = + recoveryDirFS.listFiles(recoveryDirectory, false); + while (locatedFilesStatus.hasNext()) { + LocatedFileStatus fileStatus = locatedFilesStatus.next(); + String fileName = fileStatus.getPath().getName(); + if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX)) { + FSDataInputStream inputStream = + recoveryDirFS.open(fileStatus.getPath()); + LOG.info("Parsing DAG file " + fileName); + parseDAGRecoveryFile(inputStream); + } else if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX)) { + FSDataInputStream inputStream = + recoveryDirFS.open(fileStatus.getPath()); + LOG.info("Parsing Summary file " + fileName); + parseSummaryFile(inputStream); + } else { + LOG.warn("Encountered unknown file in recovery dir, fileName=" + + fileName); + continue; + } + } + } + + private void parseSummaryFile(FSDataInputStream inputStream) + throws IOException { + int counter = 0; + while (inputStream.available() > 0) { + RecoveryProtos.SummaryEventProto proto = + RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream); + LOG.info("[SUMMARY]" + + " dagId=" + proto.getDagId() + + ", timestamp=" + proto.getTimestamp() + + ", event=" + HistoryEventType.values()[proto.getEventType()]); + } + } + + private void parseDAGRecoveryFile(FSDataInputStream inputStream) + throws IOException { + int counter = 0; + while (inputStream.available() > 0) { + int eventTypeOrdinal = inputStream.read(); + if (eventTypeOrdinal < 0 || eventTypeOrdinal >= + HistoryEventType.values().length) { + // Corrupt data + // reached end + LOG.warn("Corrupt data found when trying to read next event type"); + break; + } + HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal]; + HistoryEvent event = null; + switch (eventType) { + case AM_LAUNCHED: + event = new AMLaunchedEvent(); + break; + case AM_STARTED: + event = new AMStartedEvent(); + break; + case DAG_SUBMITTED: + event = new DAGSubmittedEvent(); + break; + case DAG_INITIALIZED: + event = new DAGInitializedEvent(); + break; + case DAG_STARTED: + event = new DAGStartedEvent(); + break; + case DAG_FINISHED: + event = new DAGFinishedEvent(); + break; + case CONTAINER_LAUNCHED: + event = new ContainerLaunchedEvent(); + break; + case VERTEX_INITIALIZED: + event = new VertexInitializedEvent(); + break; + case VERTEX_STARTED: + event = new VertexStartedEvent(); + break; + case VERTEX_FINISHED: + event = new VertexFinishedEvent(); + break; + case TASK_STARTED: + event = new TaskStartedEvent(); + break; + case TASK_FINISHED: + event = new TaskFinishedEvent(); + break; + case TASK_ATTEMPT_STARTED: + event = new TaskAttemptStartedEvent(); + break; + case TASK_ATTEMPT_FINISHED: + event = new TaskAttemptFinishedEvent(); + break; + case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED: + event = new VertexDataMovementEventsGeneratedEvent(); + break; + default: + throw new IOException("Invalid data found, unknown event type " + + eventType); + + } + ++counter; + LOG.info("Parsing event from input stream" + + ", eventType=" + eventType + + ", eventIndex=" + counter); + event.fromProtoStream(inputStream); + LOG.info("Parsed event from input stream" + + ", eventType=" + eventType + + ", eventIndex=" + counter + + ", event=" + event.toString()); + } + } + + public static void main(String argv[]) throws IOException { + // TODO clean up with better usage and error handling + Configuration conf = new Configuration(); + String dir = argv[0]; + RecoveryParser parser = new RecoveryParser(new Path(dir), conf); + parser.parse(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java new file mode 100644 index 0000000..d17637a --- /dev/null +++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.utils; + +import org.apache.tez.dag.history.HistoryEventType; +import org.apache.tez.dag.records.TezDAGID; +import org.apache.tez.dag.recovery.records.RecoveryProtos; + +public class ProtoUtils { + + public static RecoveryProtos.SummaryEventProto toSummaryEventProto( + TezDAGID dagID, long timestamp, HistoryEventType historyEventType) { + return RecoveryProtos.SummaryEventProto.newBuilder() + .setDagId(dagID.toString()) + .setTimestamp(timestamp) + .setEventType(historyEventType.ordinal()).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/main/proto/HistoryEvents.proto ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto new file mode 100644 index 0000000..e21f5df --- /dev/null +++ b/tez-dag/src/main/proto/HistoryEvents.proto @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tez.dag.recovery.records"; +option java_outer_classname = "RecoveryProtos"; +option java_generate_equals_and_hash = true; + +import "DAGApiRecords.proto"; +import "Events.proto"; + +message AMLaunchedProto { + optional string application_attempt_id = 1; + optional int64 app_submit_time = 2; + optional int64 launch_time = 3; +} + +message AMStartedProto { + optional string application_attempt_id = 1; + optional int64 start_time = 2; +} + +message ContainerLaunchedProto { + optional string application_attempt_id = 1; + optional string container_id = 2; + optional int64 launch_time = 3; +} +message DAGSubmittedProto { + optional string dag_id = 1; + optional DAGPlan dag_plan = 2; + optional int64 submit_time = 3; + optional string application_attempt_id = 4; +} + +message DAGInitializedProto { + optional string dag_id = 1; + optional int64 init_time = 2; +} + +message DAGStartedProto { + optional string dag_id = 1; + optional int64 start_time = 3; +} + +message DAGFinishedProto { + optional string dag_id = 1; + optional int64 finish_time = 2; + optional int32 state = 3; + optional string diagnostics = 4; + optional TezCountersProto counters = 5; +} + +message VertexInitializedProto { + optional string vertex_name = 1; + optional string vertex_id = 2; + optional int64 init_requested_time = 3; + optional int64 init_time = 4; + optional int64 num_tasks = 5; +} + +message VertexStartedProto { + optional string vertex_name = 1; + optional string vertex_id = 2; + optional int64 start_requested_time = 3; + optional int64 start_time = 4; +} + +message VertexFinishedProto { + optional string vertex_name = 1; + optional string vertex_id = 2; + optional int64 finish_time = 3; + optional int32 state = 4; + optional string diagnostics = 5; + optional TezCountersProto counters = 6; +} + +message TaskStartedProto { + optional string task_id = 1; + optional int64 scheduled_time = 2; + optional int64 launch_time = 3; +} + +message TaskFinishedProto { + optional string task_id = 1; + optional int64 finish_time = 2; + optional int32 state = 3; + optional string diagnostics = 4; + optional TezCountersProto counters = 5; +} + +message TaskAttemptStartedProto { + optional string task_attempt_id = 1; + optional int64 start_time = 2; + optional string container_id = 3; + optional string node_id = 4; +} + +message TaskAttemptFinishedProto { + optional string task_attempt_id = 1; + optional int64 finish_time = 2; + optional int32 state = 3; + optional string diagnostics = 4; + optional TezCountersProto counters = 5; +} + +message EventMetaDataProto { + optional int32 producer_consumer_type = 1; + optional string task_vertex_name = 2; + optional string edge_vertex_name = 3; + optional string task_attempt_id = 4; +} + +message TezDataMovementEventProto { + optional EventMetaDataProto source_info = 1; + optional EventMetaDataProto destination_info = 2; + optional DataMovementEventProto data_movement_event = 3; + optional CompositeEventProto composite_data_movement_event = 4; + optional RootInputDataInformationEventProto root_input_data_information_event = 5; +} + +message VertexDataMovementEventsGeneratedProto { + optional string vertex_id = 1; + repeated TezDataMovementEventProto tez_data_movement_event = 2; +} + +message SummaryEventProto { + optional string dag_id = 1; + optional int64 timestamp = 2; + optional int32 event_type = 3; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java index 4e02f59..1b66a4b 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java @@ -86,8 +86,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted; import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule; import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.avro.HistoryEventType; +import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; @@ -126,6 +125,7 @@ public class TestDAGImpl { private DAGPlan groupDagPlan; private DAGImpl groupDag; private TezDAGID groupDagId; + private HistoryEventHandler historyEventHandler; private class DagEventDispatcher implements EventHandler { @Override @@ -143,12 +143,6 @@ public class TestDAGImpl { } } - private class HistoryHandler implements EventHandler { - @Override - public void handle(DAGHistoryEvent event) { - } - } - private class TaskEventHandler implements EventHandler { @Override public void handle(TaskEvent event) { @@ -595,9 +589,11 @@ public class TestDAGImpl { dispatcher = new DrainDispatcher(); fsTokens = new Credentials(); appContext = mock(AppContext.class); + historyEventHandler = mock(HistoryEventHandler.class); doReturn(conf).when(appContext).getAMConf(); doReturn(appAttemptId).when(appContext).getApplicationAttemptId(); doReturn(dagId).when(appContext).getCurrentDAGID(); + doReturn(historyEventHandler).when(appContext).getHistoryHandler(); dag = new DAGImpl(dagId, conf, dagPlan, dispatcher.getEventHandler(), taskAttemptListener, fsTokens, clock, "user", thh, appContext); @@ -612,6 +608,7 @@ public class TestDAGImpl { doReturn(conf).when(mrrAppContext).getAMConf(); doReturn(mrrDag).when(mrrAppContext).getCurrentDAG(); doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId(); + doReturn(historyEventHandler).when(mrrAppContext).getHistoryHandler(); groupAppContext = mock(AppContext.class); groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3); groupDagPlan = createGroupDAGPlan(); @@ -622,14 +619,13 @@ public class TestDAGImpl { doReturn(conf).when(groupAppContext).getAMConf(); doReturn(groupDag).when(groupAppContext).getCurrentDAG(); doReturn(appAttemptId).when(groupAppContext).getApplicationAttemptId(); + doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler(); taskEventDispatcher = new TaskEventDispatcher(); dispatcher.register(TaskEventType.class, taskEventDispatcher); vertexEventDispatcher = new VertexEventDispatcher(); dispatcher.register(VertexEventType.class, vertexEventDispatcher); dagEventDispatcher = new DagEventDispatcher(); dispatcher.register(DAGEventType.class, dagEventDispatcher); - dispatcher.register(HistoryEventType.class, - new HistoryHandler()); dagFinishEventHandler = new DAGFinishEventHandler(); dispatcher.register(DAGAppMasterEventType.class, dagFinishEventHandler); dispatcher.register(TaskEventType.class, new TaskEventHandler()); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java index 6a668e3..c17adea 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java @@ -102,8 +102,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventTermination; import org.apache.tez.dag.app.dag.event.VertexEventType; import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo; import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler; -import org.apache.tez.dag.history.DAGHistoryEvent; -import org.apache.tez.dag.history.avro.HistoryEventType; +import org.apache.tez.dag.history.HistoryEventHandler; import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -156,6 +155,7 @@ public class TestVertexImpl { private TaskEventDispatcher taskEventDispatcher; private VertexEventDispatcher vertexEventDispatcher; private DagEventDispatcher dagEventDispatcher; + private HistoryEventHandler historyEventHandler; public static class CountingOutputCommitter extends OutputCommitter { @@ -303,12 +303,6 @@ public class TestVertexImpl { } } - private class HistoryHandler implements EventHandler { - @Override - public void handle(DAGHistoryEvent event) { - } - } - private class VertexEventDispatcher implements EventHandler { @@ -1189,6 +1183,7 @@ public class TestVertexImpl { public void setupPostDagCreation() { dispatcher = new DrainDispatcher(); appContext = mock(AppContext.class); + historyEventHandler = mock(HistoryEventHandler.class); TaskSchedulerEventHandler taskScheduler = mock(TaskSchedulerEventHandler.class); UserGroupInformation ugi; try { @@ -1207,6 +1202,7 @@ public class TestVertexImpl { doReturn(dagId).when(dag).getID(); doReturn(taskScheduler).when(appContext).getTaskScheduler(); doReturn(Resource.newInstance(102400, 60)).when(taskScheduler).getTotalResources(); + doReturn(historyEventHandler).when(appContext).getHistoryHandler(); vertexGroups = Maps.newHashMap(); for (PlanVertexGroupInfo groupInfo : dagPlan.getVertexGroupsList()) { @@ -1259,7 +1255,6 @@ public class TestVertexImpl { dispatcher.register(VertexEventType.class, vertexEventDispatcher); dagEventDispatcher = new DagEventDispatcher(); dispatcher.register(DAGEventType.class, dagEventDispatcher); - dispatcher.register(HistoryEventType.class, new HistoryHandler()); dispatcher.init(conf); dispatcher.start(); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java index 7afc667..2000668 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java @@ -320,6 +320,9 @@ public class OrderedWordCount { boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true); long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0) * 1000; + + boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false); + if (((otherArgs.length%2) != 0) || (!useTezSession && otherArgs.length != 2)) { printUsage(); @@ -506,7 +509,9 @@ public class OrderedWordCount { } } } finally { - fs.delete(stagingDir, true); + if (!retainStagingDir) { + fs.delete(stagingDir, true); + } if (useTezSession) { tezSession.stop(); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java index 7dc5978..c0a2574 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ContainerTask.java @@ -150,4 +150,5 @@ public class ContainerTask implements Writable { } return sb.toString(); } + } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java new file mode 100644 index 0000000..8e17508 --- /dev/null +++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.common; + +import com.google.protobuf.ByteString; +import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; +import org.apache.tez.runtime.api.events.DataMovementEvent; +import org.apache.tez.runtime.api.events.EventProtos; +import org.apache.tez.runtime.api.events.RootInputDataInformationEvent; + +public class ProtoConverters { + + public static EventProtos.DataMovementEventProto convertDataMovementEventToProto( + DataMovementEvent event) { + EventProtos.DataMovementEventProto.Builder builder = + EventProtos.DataMovementEventProto.newBuilder(); + builder.setSourceIndex(event.getSourceIndex()). + setTargetIndex(event.getTargetIndex()).setVersion(event.getVersion()); + if (event.getUserPayload() != null) { + builder.setUserPayload(ByteString.copyFrom(event.getUserPayload())); + } + return builder.build(); + } + + public static DataMovementEvent convertDataMovementEventFromProto( + EventProtos.DataMovementEventProto proto) { + return new DataMovementEvent(proto.getSourceIndex(), + proto.getTargetIndex(), + proto.getVersion(), + proto.getUserPayload() != null ? + proto.getUserPayload().toByteArray() : null); + } + + public static EventProtos.CompositeEventProto convertCompositeDataMovementEventToProto( + CompositeDataMovementEvent event) { + EventProtos.CompositeEventProto.Builder builder = + EventProtos.CompositeEventProto.newBuilder(); + builder.setStartIndex(event.getSourceIndexStart()); + builder.setEndIndex(event.getSourceIndexEnd()); + if (event.getUserPayload() != null) { + builder.setUserPayload(ByteString.copyFrom(event.getUserPayload())); + } + return builder.build(); + } + + public static CompositeDataMovementEvent convertCompositeDataMovementEventFromProto( + EventProtos.CompositeEventProto proto) { + return new CompositeDataMovementEvent(proto.getStartIndex(), + proto.getEndIndex(), + proto.hasUserPayload() ? + proto.getUserPayload().toByteArray() : null); + } + + public static EventProtos.RootInputDataInformationEventProto + convertRootInputDataInformationEventToProto(RootInputDataInformationEvent event) { + EventProtos.RootInputDataInformationEventProto.Builder builder = + EventProtos.RootInputDataInformationEventProto.newBuilder(); + builder.setIndex(event.getIndex()); + if (event.getUserPayload() != null) { + builder.setUserPayload(ByteString.copyFrom(event.getUserPayload())); + } + return builder.build(); + } + + public static RootInputDataInformationEvent + convertRootInputDataInformationEventFromProto( + EventProtos.RootInputDataInformationEventProto proto) { + return new RootInputDataInformationEvent(proto.getIndex(), + proto.getUserPayload() != null ? + proto.getUserPayload().toByteArray() : null); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/14127af1/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java index 51042c2..ecadeb5 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java @@ -23,6 +23,7 @@ import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; +import org.apache.tez.common.ProtoConverters; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; @@ -124,24 +125,14 @@ public class TezEvent implements Writable { byte[] eventBytes = null; switch (eventType) { case DATA_MOVEMENT_EVENT: - DataMovementEvent dmEvt = (DataMovementEvent) event; - DataMovementEventProto.Builder dmBuilder = DataMovementEventProto.newBuilder(); - dmBuilder.setSourceIndex(dmEvt.getSourceIndex()). - setTargetIndex(dmEvt.getTargetIndex()).setVersion(dmEvt.getVersion()); - if (dmEvt.getUserPayload() != null) { - dmBuilder.setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload())); - } - eventBytes = dmBuilder.build().toByteArray(); + eventBytes = + ProtoConverters.convertDataMovementEventToProto( + (DataMovementEvent) event).toByteArray(); break; case COMPOSITE_DATA_MOVEMENT_EVENT: - CompositeDataMovementEvent cEvent = (CompositeDataMovementEvent) event; - CompositeEventProto.Builder cBuilder = CompositeEventProto.newBuilder(); - cBuilder.setStartIndex(cEvent.getSourceIndexStart()); - cBuilder.setEndIndex(cEvent.getSourceIndexEnd()); - if (cEvent.getUserPayload() != null) { - cBuilder.setUserPayload(ByteString.copyFrom(cEvent.getUserPayload())); - } - eventBytes = cBuilder.build().toByteArray(); + eventBytes = + ProtoConverters.convertCompositeDataMovementEventToProto( + (CompositeDataMovementEvent) event).toByteArray(); break; case VERTEX_MANAGER_EVENT: VertexManagerEvent vmEvt = (VertexManagerEvent) event; @@ -178,14 +169,8 @@ public class TezEvent implements Writable { .setVersion(ifEvt.getVersion()).build().toByteArray(); break; case ROOT_INPUT_DATA_INFORMATION_EVENT: - RootInputDataInformationEvent liEvent = (RootInputDataInformationEvent) event; - RootInputDataInformationEventProto.Builder riBuilder = - RootInputDataInformationEventProto.newBuilder(); - riBuilder.setIndex(liEvent.getIndex()); - if (liEvent.getUserPayload() != null) { - riBuilder.setUserPayload(ByteString.copyFrom(liEvent.getUserPayload())); - } - eventBytes = riBuilder.build().toByteArray(); + eventBytes = ProtoConverters.convertRootInputDataInformationEventToProto( + (RootInputDataInformationEvent) event).toByteArray(); break; default: throw new TezUncheckedException("Unknown TezEvent" @@ -214,15 +199,11 @@ public class TezEvent implements Writable { case DATA_MOVEMENT_EVENT: DataMovementEventProto dmProto = DataMovementEventProto.parseFrom(eventBytes); - event = new DataMovementEvent(dmProto.getSourceIndex(), - dmProto.getTargetIndex(), - dmProto.getVersion(), - dmProto.getUserPayload() != null ? dmProto.getUserPayload().toByteArray() : null); + event = ProtoConverters.convertDataMovementEventFromProto(dmProto); break; case COMPOSITE_DATA_MOVEMENT_EVENT: CompositeEventProto cProto = CompositeEventProto.parseFrom(eventBytes); - event = new CompositeDataMovementEvent(cProto.getStartIndex(), cProto.getEndIndex(), - cProto.hasUserPayload() ? cProto.getUserPayload().toByteArray() : null); + event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto); break; case VERTEX_MANAGER_EVENT: VertexManagerEventProto vmProto = @@ -253,8 +234,7 @@ public class TezEvent implements Writable { case ROOT_INPUT_DATA_INFORMATION_EVENT: RootInputDataInformationEventProto difProto = RootInputDataInformationEventProto .parseFrom(eventBytes); - event = new RootInputDataInformationEvent(difProto.getIndex(), - difProto.getUserPayload() != null ? difProto.getUserPayload().toByteArray() : null); + event = ProtoConverters.convertRootInputDataInformationEventFromProto(difProto); break; default: // RootInputUpdatePayload event not wrapped in a TezEvent.