Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D186D10147 for ; Mon, 23 Sep 2013 17:48:28 +0000 (UTC) Received: (qmail 94693 invoked by uid 500); 23 Sep 2013 17:48:21 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 94422 invoked by uid 500); 23 Sep 2013 17:48:10 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 94302 invoked by uid 99); 23 Sep 2013 17:48:07 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Sep 2013 17:48:07 +0000 X-ASF-Spam-Status: No, hits=-2002.3 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 23 Sep 2013 17:46:31 +0000 Received: (qmail 83274 invoked by uid 99); 23 Sep 2013 17:45:20 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Sep 2013 17:45:20 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B50F99076ED; Mon, 23 Sep 2013 17:45:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.incubator.apache.org Date: Mon, 23 Sep 2013 17:45:32 -0000 Message-Id: <87a192e3ae094bc991094d851d389383@git.apache.org> In-Reply-To: <443f608461004be4a7082ca4f304c566@git.apache.org> References: <443f608461004be4a7082ca4f304c566@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [18/20] TEZ-443. Merge tez-dag-api and tez-engine-api into a single module - tez-api (part of TEZ-398). (sseth) X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java new file mode 100644 index 0000000..9cb602c --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.api; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Stack; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.EdgeProperty.DataSourceType; +import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; +import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; +import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; +import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; +import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource; +import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration; +import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint; +import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType; +import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; + + +public class DAG { // FIXME rename to Topology + final List vertices; + final List edges; + final String name; + + public DAG(String name) { + this.vertices = new ArrayList(); + this.edges = new ArrayList(); + this.name = name; + } + + public synchronized DAG addVertex(Vertex vertex) { + if (vertices.contains(vertex)) { + throw new IllegalArgumentException( + "Vertex " + vertex + " already defined!"); + } + vertices.add(vertex); + return this; + } + + @Private + public synchronized List getVertices() { + return Collections.unmodifiableList(this.vertices); + } + + public synchronized DAG addEdge(Edge edge) { + // Sanity checks + if (!vertices.contains(edge.getInputVertex())) { + throw new IllegalArgumentException( + "Input vertex " + edge.getInputVertex() + " doesn't exist!"); + } + if (!vertices.contains(edge.getOutputVertex())) { + throw new IllegalArgumentException( + "Output vertex " + edge.getOutputVertex() + " doesn't exist!"); + } + if (edges.contains(edge)) { + throw new IllegalArgumentException( + "Edge " + edge + " already defined!"); + } + + // Inform the vertices + edge.getInputVertex().addOutputVertex(edge.getOutputVertex(), edge.getId()); + edge.getOutputVertex().addInputVertex(edge.getInputVertex(), edge.getId()); + + edges.add(edge); + return this; + } + + public String getName() { + return this.name; + } + + // AnnotatedVertex is used by verify() + private static class AnnotatedVertex { + Vertex v; + + int index; //for Tarjan's algorithm + int lowlink; //for Tarjan's algorithm + boolean onstack; //for Tarjan's algorithm + + int outDegree; + + private AnnotatedVertex(Vertex v){ + this.v = v; + index = -1; + lowlink = -1; + outDegree = 0; + } + } + + // verify() + // + // Default rules + // Illegal: + // - duplicate vertex id + // - cycles + // + // Ok: + // - orphaned vertex. Occurs in map-only + // - islands. Occurs if job has unrelated workflows. + // + // Not yet categorized: + // - orphaned vertex in DAG of >1 vertex. Could be unrelated map-only job. + // - v1->v2 via two edges. perhaps some self-join job would use this? + // + // "restricted" mode: + // In short term, the supported DAGs are limited. Call with restricted=true for these verifications. + // Illegal: + // - any vertex with more than one input or output edge. (n-ary input, n-ary merge) + public void verify() throws IllegalStateException { + verify(true); + } + + public void verify(boolean restricted) throws IllegalStateException { + if (vertices.isEmpty()) { + throw new IllegalStateException("Invalid dag containing 0 vertices"); + } + + Map> edgeMap = new HashMap>(); + for(Edge e : edges){ + Vertex inputVertex = e.getInputVertex(); + List edgeList = edgeMap.get(inputVertex); + if(edgeList == null){ + edgeList = new ArrayList(); + edgeMap.put(inputVertex, edgeList); + } + edgeList.add(e); + } + + // check for valid vertices, duplicate vertex names, + // and prepare for cycle detection + Map vertexMap = new HashMap(); + for(Vertex v : vertices){ + if(vertexMap.containsKey(v.getVertexName())){ + throw new IllegalStateException("DAG contains multiple vertices" + + " with name: " + v.getVertexName()); + } + vertexMap.put(v.getVertexName(), new AnnotatedVertex(v)); + } + + detectCycles(edgeMap, vertexMap); + + if(restricted){ + for(Edge e : edges){ + vertexMap.get(e.getInputVertex().getVertexName()).outDegree++; + if (e.getEdgeProperty().getDataMovementType() != + DataMovementType.SCATTER_GATHER) { + throw new IllegalStateException( + "Unsupported connection pattern on edge. " + e); + } + if (e.getEdgeProperty().getDataSourceType() != + DataSourceType.PERSISTED) { + throw new IllegalStateException( + "Unsupported source type on edge. " + e); + } + if (e.getEdgeProperty().getSchedulingType() != + SchedulingType.SEQUENTIAL) { + throw new IllegalStateException( + "Unsupported scheduling type on edge. " + e); + } + } + for(AnnotatedVertex av: vertexMap.values()){ + if (av.outDegree > 1) { + throw new IllegalStateException("Vertex has outDegree>1: " + + av.v.getVertexName()); + } + } + } + } + + // Adaptation of Tarjan's algorithm for connected components. + // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm + private void detectCycles(Map> edgeMap, Map vertexMap) + throws IllegalStateException{ + Integer nextIndex = 0; // boxed integer so it is passed by reference. + Stack stack = new Stack(); + for(AnnotatedVertex av: vertexMap.values()){ + if(av.index == -1){ + assert stack.empty(); + strongConnect(av, vertexMap, edgeMap, stack, nextIndex); + } + } + } + + // part of Tarjan's algorithm for connected components. + // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm + private void strongConnect( + AnnotatedVertex av, + Map vertexMap, + Map> edgeMap, + Stack stack, Integer nextIndex) throws IllegalStateException{ + av.index = nextIndex; + av.lowlink = nextIndex; + nextIndex++; + stack.push(av); + av.onstack = true; + + List edges = edgeMap.get(av.v); + if(edges != null){ + for(Edge e : edgeMap.get(av.v)){ + AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getVertexName()); + if(outVertex.index == -1){ + strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex); + av.lowlink = Math.min(av.lowlink, outVertex.lowlink); + } + else if(outVertex.onstack){ + // strongly connected component detected, but we will wait till later so that the full cycle can be displayed. + // update lowlink in case outputVertex should be considered the root of this component. + av.lowlink = Math.min(av.lowlink, outVertex.index); + } + } + } + + if(av.lowlink == av.index ){ + AnnotatedVertex pop = stack.pop(); + pop.onstack = false; + if(pop != av){ + // there was something on the stack other than this "av". + // this indicates there is a scc/cycle. It comprises all nodes from top of stack to "av" + StringBuilder message = new StringBuilder(); + message.append(av.v.getVertexName() + " <- "); + for( ; pop != av; pop = stack.pop()){ + message.append(pop.v.getVertexName() + " <- "); + pop.onstack = false; + } + message.append(av.v.getVertexName()); + throw new IllegalStateException("DAG contains a cycle: " + message); + } + } + } + + + // create protobuf message describing DAG + @Private + public DAGPlan createDag(Configuration dagConf) { + verify(true); + + DAGPlan.Builder dagBuilder = DAGPlan.newBuilder(); + + dagBuilder.setName(this.name); + + for(Vertex vertex : vertices){ + VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder(); + vertexBuilder.setName(vertex.getVertexName()); + vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46. + vertexBuilder.setProcessorDescriptor(DagTypeConverters + .convertToDAGPlan(vertex.getProcessorDescriptor())); + + //task config + PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder(); + Resource resource = vertex.getTaskResource(); + taskConfigBuilder.setNumTasks(vertex.getParallelism()); + taskConfigBuilder.setMemoryMb(resource.getMemory()); + taskConfigBuilder.setVirtualCores(resource.getVirtualCores()); + taskConfigBuilder.setJavaOpts(vertex.getJavaOpts()); + + taskConfigBuilder.setTaskModule(vertex.getVertexName()); + PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder(); + Map lrs = vertex.getTaskLocalResources(); + for(Entry entry : lrs.entrySet()){ + String key = entry.getKey(); + LocalResource lr = entry.getValue(); + localResourcesBuilder.setName(key); + localResourcesBuilder.setUri( + DagTypeConverters.convertToDAGPlan(lr.getResource())); + localResourcesBuilder.setSize(lr.getSize()); + localResourcesBuilder.setTimeStamp(lr.getTimestamp()); + localResourcesBuilder.setType( + DagTypeConverters.convertToDAGPlan(lr.getType())); + localResourcesBuilder.setVisibility( + DagTypeConverters.convertToDAGPlan(lr.getVisibility())); + if(lr.getType() == LocalResourceType.PATTERN){ + if (lr.getPattern() == null || lr.getPattern().isEmpty()) { + throw new TezUncheckedException("LocalResource type set to pattern" + + " but pattern is null or empty"); + } + localResourcesBuilder.setPattern(lr.getPattern()); + } + taskConfigBuilder.addLocalResource(localResourcesBuilder); + } + + if(vertex.getTaskEnvironment() != null){ + for(String key : vertex.getTaskEnvironment().keySet()){ + PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder(); + envSettingBuilder.setKey(key); + envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key)); + taskConfigBuilder.addEnvironmentSetting(envSettingBuilder); + } + } + + if(vertex.getTaskLocationsHint() != null ){ + if(vertex.getTaskLocationsHint().getTaskLocationHints() != null){ + for(TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()){ + PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder(); + + if(hint.getDataLocalHosts() != null){ + taskLocationHintBuilder.addAllHost(hint.getDataLocalHosts()); + } + if(hint.getRacks() != null){ + taskLocationHintBuilder.addAllRack(hint.getRacks()); + } + + vertexBuilder.addTaskLocationHint(taskLocationHintBuilder); + } + } + } + + for(String inEdgeId : vertex.getInputEdgeIds()){ + vertexBuilder.addInEdgeId(inEdgeId); + } + + for(String outEdgeId : vertex.getOutputEdgeIds()){ + vertexBuilder.addOutEdgeId(outEdgeId); + } + + vertexBuilder.setTaskConfig(taskConfigBuilder); + dagBuilder.addVertex(vertexBuilder); + } + + for(Edge edge : edges){ + EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder(); + edgeBuilder.setId(edge.getId()); + edgeBuilder.setInputVertexName(edge.getInputVertex().getVertexName()); + edgeBuilder.setOutputVertexName(edge.getOutputVertex().getVertexName()); + edgeBuilder.setDataMovementType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataMovementType())); + edgeBuilder.setDataSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataSourceType())); + edgeBuilder.setSchedulingType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSchedulingType())); + edgeBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeSource())); + edgeBuilder.setEdgeDestination(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeDestination())); + dagBuilder.addEdge(edgeBuilder); + } + + if(dagConf != null) { + Iterator> iter = dagConf.iterator(); + ConfigurationProto.Builder confProtoBuilder = + ConfigurationProto.newBuilder(); + while (iter.hasNext()) { + Entry entry = iter.next(); + PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder(); + kvp.setKey(entry.getKey()); + kvp.setValue(entry.getValue()); + confProtoBuilder.addConfKeyValues(kvp); + } + dagBuilder.setDagKeyValues(confProtoBuilder); + } + + return dagBuilder.build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java new file mode 100644 index 0000000..1fd78f1 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.api; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; +import org.apache.tez.dag.api.EdgeProperty.DataSourceType; +import org.apache.tez.dag.api.EdgeProperty.SchedulingType; +import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; +import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; +import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType; +import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType; +import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType; +import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; +import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource; +import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType; +import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility; +import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration; +import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint; +import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; + +import com.google.protobuf.ByteString; + + +public class DagTypeConverters { + + public static PlanLocalResourceVisibility convertToDAGPlan(LocalResourceVisibility visibility){ + switch(visibility){ + case PUBLIC : return PlanLocalResourceVisibility.PUBLIC; + case PRIVATE : return PlanLocalResourceVisibility.PRIVATE; + case APPLICATION : return PlanLocalResourceVisibility.APPLICATION; + default : throw new RuntimeException("unknown 'visibility': " + visibility); + } + } + + public static LocalResourceVisibility convertFromDAGPlan(PlanLocalResourceVisibility visibility){ + switch(visibility){ + case PUBLIC : return LocalResourceVisibility.PUBLIC; + case PRIVATE : return LocalResourceVisibility.PRIVATE; + case APPLICATION : return LocalResourceVisibility.APPLICATION; + default : throw new RuntimeException("unknown 'visibility': " + visibility); + } + } + + public static PlanEdgeDataSourceType convertToDAGPlan(DataSourceType sourceType){ + switch(sourceType){ + case PERSISTED : return PlanEdgeDataSourceType.PERSISTED; + case PERSISTED_RELIABLE : return PlanEdgeDataSourceType.PERSISTED_RELIABLE; + case EPHEMERAL : return PlanEdgeDataSourceType.EPHEMERAL; + default : throw new RuntimeException("unknown 'dataSourceType': " + sourceType); + } + } + + public static DataSourceType convertFromDAGPlan(PlanEdgeDataSourceType sourceType){ + switch(sourceType){ + case PERSISTED : return DataSourceType.PERSISTED; + case PERSISTED_RELIABLE : return DataSourceType.PERSISTED_RELIABLE; + case EPHEMERAL : return DataSourceType.EPHEMERAL; + default : throw new RuntimeException("unknown 'dataSourceType': " + sourceType); + } + } + + public static PlanEdgeDataMovementType convertToDAGPlan(DataMovementType type){ + switch(type){ + case ONE_TO_ONE : return PlanEdgeDataMovementType.ONE_TO_ONE; + case BROADCAST : return PlanEdgeDataMovementType.BROADCAST; + case SCATTER_GATHER : return PlanEdgeDataMovementType.SCATTER_GATHER; + default : throw new RuntimeException("unknown 'dataMovementType': " + type); + } + } + + public static DataMovementType convertFromDAGPlan(PlanEdgeDataMovementType type){ + switch(type){ + case ONE_TO_ONE : return DataMovementType.ONE_TO_ONE; + case BROADCAST : return DataMovementType.BROADCAST; + case SCATTER_GATHER : return DataMovementType.SCATTER_GATHER; + default : throw new IllegalArgumentException("unknown 'dataMovementType': " + type); + } + } + + public static PlanEdgeSchedulingType convertToDAGPlan(SchedulingType type){ + switch(type){ + case SEQUENTIAL : return PlanEdgeSchedulingType.SEQUENTIAL; + case CONCURRENT : return PlanEdgeSchedulingType.CONCURRENT; + default : throw new RuntimeException("unknown 'SchedulingType': " + type); + } + } + + public static SchedulingType convertFromDAGPlan(PlanEdgeSchedulingType type){ + switch(type){ + case SEQUENTIAL : return SchedulingType.SEQUENTIAL; + case CONCURRENT : return SchedulingType.CONCURRENT; + default : throw new IllegalArgumentException("unknown 'SchedulingType': " + type); + } + } + + public static PlanLocalResourceType convertToDAGPlan(LocalResourceType type) { + switch(type){ + case ARCHIVE : return PlanLocalResourceType.ARCHIVE; + case FILE : return PlanLocalResourceType.FILE; + case PATTERN : return PlanLocalResourceType.PATTERN; + default : throw new IllegalArgumentException("unknown 'type': " + type); + } + } + + public static LocalResourceType convertFromDAGPlan(PlanLocalResourceType type) { + switch(type){ + case ARCHIVE : return LocalResourceType.ARCHIVE; + case FILE : return LocalResourceType.FILE; + case PATTERN : return LocalResourceType.PATTERN; + default : throw new IllegalArgumentException("unknown 'type': " + type); + } + } + + public static VertexLocationHint convertFromDAGPlan( + List locationHints) { + + List outputList = new ArrayList(); + + for(PlanTaskLocationHint inputHint : locationHints){ + TaskLocationHint outputHint = new TaskLocationHint( + new HashSet(inputHint.getHostList()), + new HashSet(inputHint.getRackList())); + outputList.add(outputHint); + } + return new VertexLocationHint(outputList.size(), outputList); + } + + // notes re HDFS URL handling: + // Resource URLs in the protobuf message are strings of the form hdfs://host:port/path + // org.apache.hadoop.fs.Path.Path is actually a URI type that allows any scheme + // org.apache.hadoop.yarn.api.records.URL is a URL type used by YARN. + // java.net.URL cannot be used out of the box as it rejects unknown schemes such as HDFS. + + public static String convertToDAGPlan(URL resource) { + // see above notes on HDFS URL handling + String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort() + + resource.getFile(); + return out; + } + + public static Map createLocalResourceMapFromDAGPlan( + List localResourcesList) { + Map map = new HashMap(); + for(PlanLocalResource res : localResourcesList){ + LocalResource r = new LocalResourcePBImpl(); + + //NOTE: have to check every optional field in protobuf generated classes for existence before accessing + //else we will receive a default value back, eg "" + if(res.hasPattern()){ + r.setPattern(res.getPattern()); + } + r.setResource(ConverterUtils.getYarnUrlFromPath(new Path(res.getUri()))); // see above notes on HDFS URL handling + r.setSize(res.getSize()); + r.setTimestamp(res.getTimeStamp()); + r.setType(DagTypeConverters.convertFromDAGPlan(res.getType())); + r.setVisibility(DagTypeConverters.convertFromDAGPlan(res.getVisibility())); + map.put(res.getName(), r); + } + return map; + } + + public static Map createEnvironmentMapFromDAGPlan( + List environmentSettingList) { + + Map map = new HashMap(); + for(PlanKeyValuePair setting : environmentSettingList){ + map.put(setting.getKey(), setting.getValue()); + } + + return map; + } + + public static Map createEdgePlanMapFromDAGPlan(List edgeList){ + Map edgePlanMap = + new HashMap(); + for(EdgePlan edgePlanItem : edgeList){ + edgePlanMap.put(edgePlanItem.getId(), edgePlanItem); + } + return edgePlanMap; + } + + public static EdgeProperty createEdgePropertyMapFromDAGPlan(EdgePlan edge) { + return new EdgeProperty( + convertFromDAGPlan(edge.getDataMovementType()), + convertFromDAGPlan(edge.getDataSourceType()), + convertFromDAGPlan(edge.getSchedulingType()), + convertOutputDescriptorFromDAGPlan(edge.getEdgeSource()), + convertInputDescriptorFromDAGPlan(edge.getEdgeDestination()) + ); + } + + public static Resource createResourceRequestFromTaskConfig( + PlanTaskConfiguration taskConfig) { + return Resource.newInstance(taskConfig.getMemoryMb(), taskConfig.getVirtualCores()); + } + + public static Map convertConfFromProto( + ConfigurationProto confProto) { + List settingList = confProto.getConfKeyValuesList(); + Map map = new HashMap(); + for(PlanKeyValuePair setting: settingList){ + map.put(setting.getKey(), setting.getValue()); + } + return map; + } + + public static TezEntityDescriptorProto convertToDAGPlan( + TezEntityDescriptor descriptor) { + TezEntityDescriptorProto.Builder builder = TezEntityDescriptorProto + .newBuilder(); + builder.setClassName(descriptor.getClassName()); + if (descriptor.getUserPayload() != null) { + builder + .setUserPayload(ByteString.copyFrom(descriptor.getUserPayload())); + } + return builder.build(); + } + + public static InputDescriptor convertInputDescriptorFromDAGPlan( + TezEntityDescriptorProto proto) { + String className = proto.getClassName(); + byte[] bb = null; + if (proto.hasUserPayload()) { + bb = proto.getUserPayload().toByteArray(); + } + return new InputDescriptor(className).setUserPayload(bb); + } + + public static OutputDescriptor convertOutputDescriptorFromDAGPlan( + TezEntityDescriptorProto proto) { + String className = proto.getClassName(); + byte[] bb = null; + if (proto.hasUserPayload()) { + bb = proto.getUserPayload().toByteArray(); + } + return new OutputDescriptor(className).setUserPayload(bb); + } + + public static ProcessorDescriptor convertProcessorDescriptorFromDAGPlan( + TezEntityDescriptorProto proto) { + String className = proto.getClassName(); + byte[] bb = null; + if (proto.hasUserPayload()) { + bb = proto.getUserPayload().toByteArray(); + } + return new ProcessorDescriptor(className).setUserPayload(bb); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java new file mode 100644 index 0000000..a893bc3 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.api; + +public class Edge{ + + private final Vertex inputVertex; + private final Vertex outputVertex; + private final EdgeProperty edgeProperty; + + public Edge(Vertex inputVertex, + Vertex outputVertex, + EdgeProperty edgeProperty) { + this.inputVertex = inputVertex; + this.outputVertex = outputVertex; + this.edgeProperty = edgeProperty; + } + + // RENAME to source and destination + public Vertex getInputVertex() { + return inputVertex; + } + + public Vertex getOutputVertex() { + return outputVertex; + } + + public EdgeProperty getEdgeProperty() { + return edgeProperty; + } + + /* + * Used to identify the edge in the configuration + */ + public String getId() { + return String.valueOf(this.hashCode()); + } + + @Override + public String toString() { + return inputVertex + " -> " + outputVertex + " (" + edgeProperty + ")"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java new file mode 100644 index 0000000..326d3d0 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.dag.api; + +public class EdgeProperty { + + /** + * Defines the manner of data movement between source and destination tasks. + * Determines which destination tasks have access to data produced on this + * edge by a source task. A destination task may choose to read any portion of + * the data available to it. + */ + public enum DataMovementType { + /** + * Output on this edge produced by the i-th source task is available to the + * i-th destination task. + */ + ONE_TO_ONE, + /** + * Output on this edge produced by any source task is available to all + * destination tasks. + */ + BROADCAST, + /** + * The i-th output on this edge produced by all source tasks is available to + * the same destination task. Source tasks scatter their outputs and they + * are gathered by designated destination tasks. + */ + SCATTER_GATHER + } + + /** + * Determines the lifetime of the data produced on this edge by a source task. + */ + public enum DataSourceType { + /** + * Data produced by the source is persisted and available even when the + * task is not running. The data may become unavailable and may cause the + * source task to be re-executed. + */ + PERSISTED, + /** + * Source data is stored reliably and will always be available + */ + PERSISTED_RELIABLE, + /** + * Data produced by the source task is available only while the source task + * is running. This requires the destination task to run concurrently with + * the source task. + */ + EPHEMERAL + } + + /** + * Determines when the destination task is eligible to run, once the source + * task is eligible to run. + */ + public enum SchedulingType { + /** + * Destination task is eligible to run after one or more of its source tasks + * have started or completed. + */ + SEQUENTIAL, + /** + * Destination task must run concurrently with the source task + */ + CONCURRENT + } + + DataMovementType dataMovementType; + DataSourceType dataSourceType; + SchedulingType schedulingType; + InputDescriptor inputDescriptor; + OutputDescriptor outputDescriptor; + + /** + * @param dataMovementType + * @param dataSourceType + * @param edgeSource + * The {@link OutputDescriptor} that generates data on the edge. + * @param edgeDestination + * The {@link InputDescriptor} which will consume data from the edge. + */ + public EdgeProperty(DataMovementType dataMovementType, + DataSourceType dataSourceType, + SchedulingType schedulingType, + OutputDescriptor edgeSource, + InputDescriptor edgeDestination) { + this.dataMovementType = dataMovementType; + this.dataSourceType = dataSourceType; + this.schedulingType = schedulingType; + this.inputDescriptor = edgeDestination; + this.outputDescriptor = edgeSource; + } + + public DataMovementType getDataMovementType() { + return dataMovementType; + } + + public DataSourceType getDataSourceType() { + return dataSourceType; + } + + public SchedulingType getSchedulingType() { + return schedulingType; + } + + /** + * Returns the {@link InputDescriptor} which will consume data from the edge. + * + * @return + */ + public InputDescriptor getEdgeDestination() { + return inputDescriptor; + } + + /** + * Returns the {@link OutputDescriptor} which produces data on the edge. + * + * @return + */ + public OutputDescriptor getEdgeSource() { + return outputDescriptor; + } + + @Override + public String toString() { + return "{ " + dataMovementType + " : " + inputDescriptor.getClassName() + + " >> " + dataSourceType + " >> " + outputDescriptor.getClassName() + " }"; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java new file mode 100644 index 0000000..dea9001 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/InputDescriptor.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +public class InputDescriptor extends TezEntityDescriptor { + + public InputDescriptor(String inputClassName) { + super(inputClassName); + } + + @Override + public InputDescriptor setUserPayload(byte[] userPayload) { + this.userPayload = userPayload; + return this; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java new file mode 100644 index 0000000..16fb9b1 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/OutputDescriptor.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +public class OutputDescriptor extends TezEntityDescriptor { + + public OutputDescriptor(String outputClassName) { + super(outputClassName); + } + + @Override + public OutputDescriptor setUserPayload(byte[] userPayload) { + this.userPayload = userPayload; + return this; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java new file mode 100644 index 0000000..092147d --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/ProcessorDescriptor.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +public class ProcessorDescriptor extends TezEntityDescriptor { + + public ProcessorDescriptor(String processorClassName) { + super(processorClassName); + } + + public ProcessorDescriptor setUserPayload(byte[] userPayload) { + this.userPayload = userPayload; + return this; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java new file mode 100644 index 0000000..7447974 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import org.apache.hadoop.conf.Configuration; + +public class TezConfiguration extends Configuration { + + public final static String TEZ_SITE_XML = "tez-site.xml"; + + static { + addDefaultResource(TEZ_SITE_XML); + } + + public TezConfiguration() { + super(); + } + + public TezConfiguration(Configuration conf) { + super(conf); + } + + public static final String TEZ_PREFIX = "tez."; + public static final String TEZ_AM_PREFIX = TEZ_PREFIX + "am."; + public static final String TEZ_TASK_PREFIX = TEZ_PREFIX + "task."; + + public static final String TEZ_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir"; + public static final String TEZ_AM_STAGING_DIR_DEFAULT = "/tmp/tez/staging"; + + // TODO Should not be required once all tokens are handled via AppSubmissionContext + public static final String JOB_SUBMIT_DIR = TEZ_PREFIX + "jobSubmitDir"; + public static final String APPLICATION_TOKENS_FILE = "appTokens"; + public static final String TEZ_APPLICATION_MASTER_CLASS = + "org.apache.tez.dag.app.DAGAppMaster"; + + /** Root Logging level passed to the Tez app master.*/ + public static final String TEZ_AM_LOG_LEVEL = TEZ_AM_PREFIX+"log.level"; + public static final String TEZ_AM_LOG_LEVEL_DEFAULT = "INFO"; + + public static final String TEZ_AM_JAVA_OPTS = TEZ_AM_PREFIX + + "java.opts"; + public static final String DEFAULT_TEZ_AM_JAVA_OPTS = " -Xmx1024m "; + + public static final String TEZ_AM_CANCEL_DELEGATION_TOKEN = TEZ_AM_PREFIX + + "am.complete.cancel.delegation.tokens"; + public static final boolean TEZ_AM_CANCEL_DELEGATION_TOKEN_DEFAULT = true; + + public static final String TEZ_AM_TASK_LISTENER_THREAD_COUNT = + TEZ_AM_PREFIX + "task.listener.thread-count"; + public static final int TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30; + + public static final String TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT = + TEZ_AM_PREFIX + "container.listener.thread-count"; + public static final int TEZ_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT = 30; + + // TODO Some of the DAG properties are job specific and not AM specific. Rename accordingly. + // TODO Are any of these node blacklisting properties required. (other than for MR compat) + public static final String TEZ_AM_MAX_TASK_FAILURES_PER_NODE = TEZ_AM_PREFIX + + "maxtaskfailures.per.node"; + public static final int TEZ_AM_MAX_TASK_FAILURES_PER_NODE_DEFAULT = 3; + + public static final String TEZ_AM_MAX_TASK_ATTEMPTS = + TEZ_AM_PREFIX + "max.task.attempts"; + public static final int TEZ_AM_MAX_TASK_ATTEMPTS_DEFAULT = 4; + + public static final String TEZ_AM_NODE_BLACKLISTING_ENABLED = TEZ_AM_PREFIX + + "node-blacklisting.enabled"; + public static final boolean TEZ_AM_NODE_BLACKLISTING_ENABLED_DEFAULT = true; + public static final String TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD = TEZ_AM_PREFIX + + "node-blacklisting.ignore-threshold-node-percent"; + public static final int TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33; + + /** Number of threads to handle job client RPC requests.*/ + public static final String TEZ_AM_CLIENT_THREAD_COUNT = + TEZ_AM_PREFIX + "client.am.thread-count"; + public static final int TEZ_AM_CLIENT_THREAD_COUNT_DEFAULT = 1; + /** + * Range of ports that the AM can use when binding. Leave blank + * if you want all possible ports. + */ + public static final String TEZ_AM_CLIENT_AM_PORT_RANGE = + TEZ_AM_PREFIX + "client.am.port-range"; + + + public static final String TEZ_AM_RESOURCE_MEMORY_MB = TEZ_AM_PREFIX + + "resource.memory.mb"; + public static final int TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT = 1536; + + public static final String TEZ_AM_RESOURCE_CPU_VCORES = TEZ_AM_PREFIX + + "resource.cpu.vcores"; + public static final int TEZ_AM_RESOURCE_CPU_VCORES_DEFAULT = 1; + + public static final String + TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION = TEZ_AM_PREFIX + + "shuffle-vertex-manager.min-src-fraction"; + public static final float + TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f; + + public static final String + TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = TEZ_AM_PREFIX + + "shuffle-vertex-manager.max-src-fraction"; + public static final float + TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f; + + public static final String + TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL = TEZ_AM_PREFIX + + "shuffle-vertex-manager.enable.auto-parallel"; + public static final boolean + TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = false; + + public static final String + TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE = TEZ_AM_PREFIX + + "shuffle-vertex-manager.desired-task-input-size"; + public static final long + TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT = + 1024*1024*100L; + + public static final String + TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM = TEZ_AM_PREFIX + + "shuffle-vertex-manager.min-task-parallelism"; + public static final int + TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_TASK_PARALLELISM_DEFAULT = 1; + + public static final String + TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION = TEZ_AM_PREFIX + + "slowstart-dag-scheduler.min-resource-fraction"; + public static final float + TEZ_AM_SLOWSTART_DAG_SCHEDULER_MIN_SHUFFLE_RESOURCE_FRACTION_DEFAULT = 0.5f; + + public static final String TEZ_AM_AGGRESSIVE_SCHEDULING = TEZ_AM_PREFIX + + "aggressive.scheduling"; + public static boolean TEZ_AM_AGGRESSIVE_SCHEDULING_DEFAULT = false; + + /** + * The complete path to the serialized dag plan file + * TEZ_AM_PLAN_PB_BINARY. Used to make the plan available to + * individual tasks if needed. This will typically be a path in the job submit + * directory. + */ + public static final String TEZ_AM_PLAN_REMOTE_PATH = TEZ_AM_PREFIX + + "dag-am-plan.remote.path"; + + public static final String TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX = TEZ_AM_PREFIX + + "am-rm.heartbeat.interval-ms.max"; + public static final int TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX_DEFAULT = 1000; + + public static final String TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX = TEZ_TASK_PREFIX + + "get-task.sleep.interval-ms.max"; + public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 500; + + public static final String TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS = TEZ_TASK_PREFIX + + "am.heartbeat.interval-ms.max"; + public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 100; + + public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX + + "max-events-per-heartbeat.max"; + public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 100; + + /** + * Configuration to specify whether container should be reused. + */ + public static final String TEZ_AM_CONTAINER_REUSE_ENABLED = TEZ_AM_PREFIX + + "container.reuse.enabled"; + public static final boolean TEZ_AM_CONTAINER_REUSE_ENABLED_DEFAULT = true; + + /** + * Whether to reuse containers for rack local tasks. Active only if reuse is + * enabled. + */ + public static final String TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED = TEZ_AM_PREFIX + + "container.reuse.rack-fallback.enabled"; + public static final boolean TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED_DEFAULT = true; + + /** + * Whether to reuse containers for non-local tasks. Active only if reuse is + * enabled. + */ + public static final String TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED = TEZ_AM_PREFIX + + "container.reuse.non-local-fallback.enabled"; + public static final boolean TEZ_AM_CONTAINER_REUSE_NON_LOCAL_FALLBACK_ENABLED_DEFAULT = false; + + public static final String TEZ_AM_CONTAINER_REUSE_DELAY_ALLOCATION_MILLIS = TEZ_AM_PREFIX + + "container.reuse.delay-allocation-millis"; + public static final long TEZ_AM_CONTAINER_REUSE_DELAY_ALLOCATION_MILLIS_DEFAULT = 3000l; + + public static final String TEZ_PB_BINARY_CONF_NAME = "tez-conf.pb"; + public static final String TEZ_PB_PLAN_BINARY_NAME = "tez-dag.pb"; + public static final String TEZ_PB_PLAN_TEXT_NAME = "tez-dag.pb.txt"; + + /* + * Logger properties + */ + public static final String TEZ_CONTAINER_LOG4J_PROPERTIES_FILE = "tez-container-log4j.properties"; + public static final String TEZ_CONTAINER_LOGGER_NAME = "CLA"; + public static final String TEZ_ROOT_LOGGER_NAME = "tez.root.logger"; + public static final String TEZ_CONTAINER_LOG_FILE_NAME = "syslog"; + public static final String TEZ_CONTAINER_ERR_FILE_NAME = "stderr"; + public static final String TEZ_CONTAINER_OUT_FILE_NAME = "stdout"; + + + public static final String TEZ_LIB_URIS = + TEZ_PREFIX + "lib.uris"; + + public static final String TEZ_APPLICATION_TYPE = "TEZ-MR*"; + + public static final String LOCAL_FRAMEWORK_NAME = "local-tez"; +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java new file mode 100644 index 0000000..5463d65 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +/** + * Specifies all constant values in Tez + */ +public class TezConstants { + + // Env variable names + public static final String TEZ_AM_IS_SESSION_ENV = "TEZ_AM_IS_SESSION"; + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java new file mode 100644 index 0000000..9d4b2c4 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezEntityDescriptor.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +public abstract class TezEntityDescriptor { + + protected byte[] userPayload; + private String className; + + public TezEntityDescriptor(String className) { + this.className = className; + } + + public byte[] getUserPayload() { + return this.userPayload; + } + + public TezEntityDescriptor setUserPayload(byte[] userPayload) { + this.userPayload = userPayload; + return this; + } + + public String getClassName() { + return this.className; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java new file mode 100644 index 0000000..e3b14e7 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezException.java @@ -0,0 +1,31 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api; + +/** + * Base TezException + */ +public class TezException extends Exception { + private static final long serialVersionUID = 6337442733802964447L; + public TezException(Throwable cause) { super(cause); } + public TezException(String message) { super(message); } + public TezException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java new file mode 100644 index 0000000..f55f6dd --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezUncheckedException.java @@ -0,0 +1,33 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api; + +/** + * Base Tez Unchecked Exception + */ +public class TezUncheckedException extends RuntimeException { + + private static final long serialVersionUID = -4956339297375386184L; + + public TezUncheckedException(Throwable cause) { super(cause); } + public TezUncheckedException(String message) { super(message); } + public TezUncheckedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java new file mode 100644 index 0000000..900822b --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; + +public class Vertex { // FIXME rename to Task + + private final String vertexName; + private final ProcessorDescriptor processorDescriptor; + + private final int parallelism; + private VertexLocationHint taskLocationsHint; + private final Resource taskResource; + private Map taskLocalResources; + private Map taskEnvironment; + + private final List inputVertices = new ArrayList(); + private final List outputVertices = new ArrayList(); + private final List inputEdgeIds = new ArrayList(); + private final List outputEdgeIds = new ArrayList(); + private String javaOpts = ""; + + + public Vertex(String vertexName, + ProcessorDescriptor processorDescriptor, + int parallelism, + Resource taskResource) { + this.vertexName = vertexName; + this.processorDescriptor = processorDescriptor; + this.parallelism = parallelism; + this.taskResource = taskResource; + if (parallelism == 0) { + throw new IllegalArgumentException("Parallelism cannot be 0"); + } + if (taskResource == null) { + throw new IllegalArgumentException("Resource cannot be null"); + } + } + + public String getVertexName() { // FIXME rename to getName() + return vertexName; + } + + public ProcessorDescriptor getProcessorDescriptor() { + return this.processorDescriptor; + } + + public int getParallelism() { + return parallelism; + } + + public Resource getTaskResource() { + return taskResource; + } + + public Vertex setTaskLocationsHint(List locations) { + if (locations == null) { + return this; + } + assert locations.size() == parallelism; + taskLocationsHint = new VertexLocationHint(parallelism, locations); + return this; + } + + // used internally to create parallelism location resource file + VertexLocationHint getTaskLocationsHint() { + return taskLocationsHint; + } + + public Vertex setTaskLocalResources(Map localResources) { + this.taskLocalResources = localResources; + return this; + } + + public Map getTaskLocalResources() { + return taskLocalResources; + } + + public Vertex setTaskEnvironment(Map environment) { + this.taskEnvironment = environment; + return this; + } + + public Map getTaskEnvironment() { + return taskEnvironment; + } + + public Vertex setJavaOpts(String javaOpts){ + this. javaOpts = javaOpts; + return this; + } + + public String getJavaOpts(){ + return javaOpts; + } + + @Override + public String toString() { + return "[" + vertexName + " : " + processorDescriptor.getClassName() + "]"; + } + + void addInputVertex(Vertex inputVertex, String edgeId) { + inputVertices.add(inputVertex); + inputEdgeIds.add(edgeId); + } + + void addOutputVertex(Vertex outputVertex, String edgeId) { + outputVertices.add(outputVertex); + outputEdgeIds.add(edgeId); + } + + List getInputVertices() { + return inputVertices; + } + + List getOutputVertices() { + return outputVertices; + } + + List getInputEdgeIds() { + return inputEdgeIds; + } + + List getOutputEdgeIds() { + return outputEdgeIds; + } + + // FIXME how do we support profiling? Can't profile all tasks. + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java b/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java new file mode 100644 index 0000000..4f19314 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class VertexLocationHint { + + private final int numTasks; + private final List taskLocationHints; + + public VertexLocationHint(int numTasks, + List taskLocationHints) { + this.numTasks = numTasks; + if (taskLocationHints != null) { + this.taskLocationHints = Collections.unmodifiableList(taskLocationHints); + } else { + this.taskLocationHints = null; + } + } + + public int getNumTasks() { + return numTasks; + } + + public List getTaskLocationHints() { + return taskLocationHints; + } + + @Override + public int hashCode() { + final int prime = 7883; + int result = 1; + result = prime * result + numTasks; + if (taskLocationHints != null) { + result = prime * result + taskLocationHints.hashCode(); + } + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + VertexLocationHint other = (VertexLocationHint) obj; + if (numTasks != other.numTasks) { + return false; + } + if (taskLocationHints != null) { + if (!taskLocationHints.equals(other.taskLocationHints)) { + return false; + } + } else if (other.taskLocationHints != null) { + return false; + } + return true; + } + + public static class TaskLocationHint { + + // Host names if any to be used + private final Set hosts; + // Rack names if any to be used + private final Set racks; + + public TaskLocationHint(Set hosts, Set racks) { + if (hosts != null) { + this.hosts = Collections.unmodifiableSet(hosts); + } else { + this.hosts = null; + } + if (racks != null) { + this.racks = Collections.unmodifiableSet(racks); + } else { + this.racks = null; + } + } + + public Set getDataLocalHosts() { + return hosts; + } + + public Set getRacks() { + return racks; + } + + @Override + public int hashCode() { + final int prime = 9397; + int result = 1; + result = ( hosts != null) ? + prime * result + hosts.hashCode() : + result + prime; + result = ( racks != null) ? + prime * result + racks.hashCode() : + result + prime; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TaskLocationHint other = (TaskLocationHint) obj; + if (hosts != null) { + if (!hosts.equals(other.hosts)) { + return false; + } + } else if (other.hosts != null) { + return false; + } + if (racks != null) { + if (!racks.equals(other.racks)) { + return false; + } + } else if (other.racks != null) { + return false; + } + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java new file mode 100644 index 0000000..9062e8e --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java @@ -0,0 +1,67 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api.client; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.tez.dag.api.TezException; + +/* + * Interface class for monitoring the DAG running in a Tez DAG + * Application Master. + */ +public interface DAGClient extends Closeable { + + /** + * Get the YARN ApplicationId for the app running the DAG + * @return ApplicationId + */ + public ApplicationId getApplicationId(); + + @Private + /** + * Get the YARN ApplicationReport for the app running the DAG. For performance + * reasons this may be stale copy and should be used to access static info. It + * may be null. + * @return ApplicationReport or null + */ + public ApplicationReport getApplicationReport(); + + /** + * Get the status of the specified DAG + */ + public DAGStatus getDAGStatus() throws IOException, TezException; + + /** + * Get the status of a Vertex of a DAG + */ + public VertexStatus getVertexStatus(String vertexName) + throws IOException, TezException; + + /** + * Kill a running DAG + * + */ + public void tryKillDAG() throws TezException, IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java new file mode 100644 index 0000000..d61173d --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java @@ -0,0 +1,130 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api.client; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProtoOrBuilder; +import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto; +import org.apache.tez.dag.api.TezUncheckedException; + +public class DAGStatus { + + public enum State { + SUBMITTED, + INITING, + RUNNING, + SUCCEEDED, + KILLED, + FAILED, + ERROR, + }; + + DAGStatusProtoOrBuilder proxy = null; + Progress progress = null; + Map vertexProgress = null; + + public DAGStatus(DAGStatusProtoOrBuilder proxy) { + this.proxy = proxy; + } + + public State getState() { + switch(proxy.getState()) { + case DAG_SUBMITTED: + return DAGStatus.State.SUBMITTED; + // For simplicity, initing/terminating states are presented as running + case DAG_INITING: + case DAG_TERMINATING: + case DAG_RUNNING: + return DAGStatus.State.RUNNING; + case DAG_SUCCEEDED: + return DAGStatus.State.SUCCEEDED; + case DAG_FAILED: + return DAGStatus.State.FAILED; + case DAG_KILLED: + return DAGStatus.State.KILLED; + case DAG_ERROR: + return DAGStatus.State.ERROR; + default: + throw new TezUncheckedException("Unsupported value for DAGStatus.State : " + + proxy.getState()); + } + } + + public boolean isCompleted() { + State state = getState(); + return (state == State.SUCCEEDED || + state == State.FAILED || + state == State.KILLED || + state == State.ERROR); + } + + public List getDiagnostics() { + return proxy.getDiagnosticsList(); + } + + /** + * Gets overall progress value of the DAG. + * + * @return Progress of the DAG. Maybe null when the DAG is not running. Maybe + * null when the DAG is running and the application master cannot be + * reached - e.g. when the execution platform has restarted the + * application master. + * @see Progress + */ + public Progress getDAGProgress() { + if(progress == null && proxy.hasDAGProgress()) { + progress = new Progress(proxy.getDAGProgress()); + } + return progress; + } + + /** + * Get the progress of a vertex in the DAG + * + * @return Progress of the vertex. May be null when the DAG is not running. + * Maybe null when the DAG is running and the application master + * cannot be reached - e.g. when the execution platform has restarted + * the application master. + * @see Progress + */ + public Map getVertexProgress() { + if(vertexProgress == null) { + if(proxy.getVertexProgressList() != null) { + List kvList = proxy.getVertexProgressList(); + vertexProgress = new HashMap(kvList.size()); + for(StringProgressPairProto kv : kvList){ + vertexProgress.put(kv.getKey(), new Progress(kv.getProgress())); + } + } + } + return vertexProgress; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("status=" + getState() + + ", progress=" + getDAGProgress()); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java new file mode 100644 index 0000000..9577320 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/Progress.java @@ -0,0 +1,67 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api.client; + +import org.apache.tez.dag.api.records.DAGProtos.ProgressProtoOrBuilder; + +public class Progress { + + ProgressProtoOrBuilder proxy = null; + + Progress(ProgressProtoOrBuilder proxy) { + this.proxy = proxy; + } + + public int getTotalTaskCount() { + return proxy.getTotalTaskCount(); + } + + public int getSucceededTaskCount() { + return proxy.getSucceededTaskCount(); + } + + public int getRunningTaskCount() { + return proxy.getRunningTaskCount(); + } + + public int getFailedTaskCount() { + return proxy.getFailedTaskCount(); + } + + public int getKilledTaskCount() { + return proxy.getKilledTaskCount(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("TotalTasks: "); + sb.append(getTotalTaskCount()); + sb.append(" Succeeded: "); + sb.append(getSucceededTaskCount()); + sb.append(" Running: "); + sb.append(getRunningTaskCount()); + sb.append(" Failed: "); + sb.append(getFailedTaskCount()); + sb.append(" Killed: "); + sb.append(getKilledTaskCount()); + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java new file mode 100644 index 0000000..ce5dbe0 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java @@ -0,0 +1,78 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.dag.api.client; + +import java.util.List; + +import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProtoOrBuilder; +import org.apache.tez.dag.api.TezUncheckedException; + +public class VertexStatus { + + public enum State { + INITED, + RUNNING, + SUCCEEDED, + KILLED, + FAILED, + ERROR, + TERMINATING, + }; + + VertexStatusProtoOrBuilder proxy = null; + Progress progress = null; + + public VertexStatus(VertexStatusProtoOrBuilder proxy) { + this.proxy = proxy; + } + + public State getState() { + switch(proxy.getState()) { + case VERTEX_INITED: + return VertexStatus.State.INITED; + case VERTEX_RUNNING: + return VertexStatus.State.RUNNING; + case VERTEX_SUCCEEDED: + return VertexStatus.State.SUCCEEDED; + case VERTEX_FAILED: + return VertexStatus.State.FAILED; + case VERTEX_KILLED: + return VertexStatus.State.KILLED; + case VERTEX_ERROR: + return VertexStatus.State.ERROR; + case VERTEX_TERMINATING: + return VertexStatus.State.TERMINATING; + default: + throw new TezUncheckedException("Unsupported value for VertexStatus.State : " + + proxy.getState()); + } + } + + public List getDiagnostics() { + return proxy.getDiagnosticsList(); + } + + public Progress getProgress() { + if(progress == null && proxy.hasProgress()) { + progress = new Progress(proxy.getProgress()); + } + return progress; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java new file mode 100644 index 0000000..a1ee18f --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPB.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api.client.rpc; + +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.DAGClientAMProtocol; + +@ProtocolInfo( + protocolName = "org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolPB", + protocolVersion = 1) +public interface DAGClientAMProtocolBlockingPB + extends DAGClientAMProtocol.BlockingInterface { + +}