Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 91706102E2 for ; Wed, 6 Nov 2013 00:35:41 +0000 (UTC) Received: (qmail 40454 invoked by uid 500); 6 Nov 2013 00:35:41 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 40430 invoked by uid 500); 6 Nov 2013 00:35:41 -0000 Mailing-List: contact commits-help@tez.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.incubator.apache.org Delivered-To: mailing list commits@tez.incubator.apache.org Received: (qmail 40423 invoked by uid 99); 6 Nov 2013 00:35:41 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Nov 2013 00:35:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Wed, 06 Nov 2013 00:35:37 +0000 Received: (qmail 36864 invoked by uid 99); 6 Nov 2013 00:35:17 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Nov 2013 00:35:17 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id CBBDB45C26; Wed, 6 Nov 2013 00:35:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.incubator.apache.org Date: Wed, 06 Nov 2013 00:35:17 -0000 Message-Id: <56e4d619928f4e9eb7da14bea468ab0d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: TEZ-12. Support for counters. (hitesh) X-Virus-Checked: Checked by ClamAV on apache.org TEZ-12. Support for counters. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6fddbd01 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6fddbd01 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6fddbd01 Branch: refs/heads/master Commit: 6fddbd01b5050d2bd58b4edbd0464798e1b10a1f Parents: 83a657b Author: Hitesh Shah Authored: Tue Nov 5 16:34:57 2013 -0800 Committer: Hitesh Shah Committed: Tue Nov 5 16:34:57 2013 -0800 ---------------------------------------------------------------------- .../apache/tez/dag/api/DagTypeConverters.java | 100 ++++++++++++++++- .../apache/tez/dag/api/client/DAGClient.java | 15 ++- .../apache/tez/dag/api/client/DAGStatus.java | 27 ++++- .../tez/dag/api/client/StatusGetOpts.java | 28 +++++ .../apache/tez/dag/api/client/VertexStatus.java | 43 ++++++-- .../dag/api/client/rpc/DAGClientRPCImpl.java | 57 +++++++--- tez-api/src/main/proto/DAGApiRecords.proto | 42 +++++-- .../src/main/proto/DAGClientAMProtocol.proto | 2 + .../tez/dag/api/client/DAGStatusBuilder.java | 21 ++-- .../tez/dag/api/client/ProgressBuilder.java | 15 ++- .../tez/dag/api/client/VertexStatusBuilder.java | 20 +++- ...DAGClientAMProtocolBlockingPBServerImpl.java | 8 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 26 +++-- .../java/org/apache/tez/dag/app/dag/DAG.java | 7 +- .../java/org/apache/tez/dag/app/dag/Vertex.java | 11 +- .../apache/tez/dag/app/dag/impl/DAGImpl.java | 20 ++-- .../apache/tez/dag/app/dag/impl/TaskImpl.java | 80 +++++++------- .../apache/tez/dag/app/dag/impl/VertexImpl.java | 109 ++++++++++--------- .../tez/mapreduce/examples/ExampleDriver.java | 41 ++++++- .../mapreduce/examples/FilterLinesByWord.java | 27 ++--- .../examples/GroupByOrderByMRRTest.java | 11 +- .../tez/mapreduce/examples/MRRSleepJob.java | 2 +- .../mapreduce/examples/OrderedWordCount.java | 34 ++++-- .../apache/tez/mapreduce/client/YARNRunner.java | 2 +- .../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 4 +- 25 files changed, 529 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index 803c943..098a201 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -20,8 +20,11 @@ package org.apache.tez.dag.api; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.TreeSet; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; @@ -32,11 +35,16 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.client.TezSessionStatus; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.EdgeProperty.DataSourceType; import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezSessionStatusProto; +import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType; @@ -50,6 +58,9 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility; import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration; import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint; import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto; +import org.apache.tez.dag.api.records.DAGProtos.TezCounterGroupProto; +import org.apache.tez.dag.api.records.DAGProtos.TezCounterProto; +import org.apache.tez.dag.api.records.DAGProtos.TezCountersProto; import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto; import com.google.protobuf.ByteString; @@ -166,9 +177,8 @@ public class DagTypeConverters { public static String convertToDAGPlan(URL resource) { // see above notes on HDFS URL handling - String out = resource.getScheme() + "://" + resource.getHost() + ":" + resource.getPort() - + resource.getFile(); - return out; + return resource.getScheme() + "://" + resource.getHost() + + ":" + resource.getPort() + resource.getFile(); } public static Map createLocalResourceMapFromDAGPlan( @@ -380,4 +390,88 @@ public class DagTypeConverters { plr.hasPattern() ? plr.getPattern() : null); } + public static TezCounters convertTezCountersFromProto(TezCountersProto proto) { + TezCounters counters = new TezCounters(); + for (TezCounterGroupProto counterGroupProto : proto.getCounterGroupsList()) { + CounterGroup group = counters.addGroup(counterGroupProto.getName(), + counterGroupProto.getDisplayName()); + for (TezCounterProto counterProto : + counterGroupProto.getCountersList()) { + TezCounter counter = group.findCounter( + counterProto.getName(), + counterProto.getDisplayName()); + counter.setValue(counterProto.getValue()); + } + } + return counters; + } + + public static TezCountersProto convertTezCountersToProto( + TezCounters counters) { + TezCountersProto.Builder builder = TezCountersProto.newBuilder(); + Iterator groupIterator = counters.iterator(); + int groupIndex = 0; + while (groupIterator.hasNext()) { + CounterGroup counterGroup = groupIterator.next(); + TezCounterGroupProto.Builder groupBuilder = + TezCounterGroupProto.newBuilder(); + groupBuilder.setName(counterGroup.getName()); + groupBuilder.setDisplayName(counterGroup.getDisplayName()); + Iterator counterIterator = counterGroup.iterator(); + int counterIndex = 0; + while (counterIterator.hasNext()) { + TezCounter counter = counterIterator.next(); + TezCounterProto tezCounterProto = TezCounterProto.newBuilder() + .setName(counter.getName()) + .setDisplayName(counter.getDisplayName()) + .setValue(counter.getValue()) + .build(); + groupBuilder.addCounters(counterIndex, tezCounterProto); + ++counterIndex; + } + builder.addCounterGroups(groupIndex, groupBuilder.build()); + ++groupIndex; + } + return builder.build(); + } + + public static DAGProtos.StatusGetOptsProto convertStatusGetOptsToProto( + StatusGetOpts statusGetOpts) { + switch (statusGetOpts) { + case GET_COUNTERS: + return DAGProtos.StatusGetOptsProto.GET_COUNTERS; + } + throw new TezUncheckedException("Could not convert StatusGetOpts to" + + " proto"); + } + + public static StatusGetOpts convertStatusGetOptsFromProto( + DAGProtos.StatusGetOptsProto proto) { + switch (proto) { + case GET_COUNTERS: + return StatusGetOpts.GET_COUNTERS; + } + throw new TezUncheckedException("Could not convert to StatusGetOpts from" + + " proto"); + } + + public static List convertStatusGetOptsToProto( + Set statusGetOpts) { + List protos = + new ArrayList(statusGetOpts.size()); + for (StatusGetOpts opt : statusGetOpts) { + protos.add(convertStatusGetOptsToProto(opt)); + } + return protos; + } + + public static Set convertStatusGetOptsFromProto( + List protoList) { + Set opts = new TreeSet(); + for (DAGProtos.StatusGetOptsProto proto : protoList) { + opts.add(convertStatusGetOptsFromProto(proto)); + } + return opts; + } + } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java index 9062e8e..bbb225c 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGClient.java @@ -20,6 +20,7 @@ package org.apache.tez.dag.api.client; import java.io.Closeable; import java.io.IOException; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -49,19 +50,25 @@ public interface DAGClient extends Closeable { /** * Get the status of the specified DAG + * @param statusOptions Optionally, retrieve additional information based on + * specified options */ - public DAGStatus getDAGStatus() throws IOException, TezException; + public DAGStatus getDAGStatus(Set statusOptions) + throws IOException, TezException; /** * Get the status of a Vertex of a DAG + * @param statusOptions Optionally, retrieve additional information based on + * specified options */ - public VertexStatus getVertexStatus(String vertexName) - throws IOException, TezException; + public VertexStatus getVertexStatus(String vertexName, + Set statusOptions) + throws IOException, TezException; /** * Kill a running DAG * */ - public void tryKillDAG() throws TezException, IOException; + public void tryKillDAG() throws IOException, TezException; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java index 8b7277f..b2a4d21 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/DAGStatus.java @@ -21,8 +21,11 @@ package org.apache.tez.dag.api.client; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.util.StringUtils; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProtoOrBuilder; import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto; import org.apache.tez.dag.api.TezUncheckedException; @@ -40,11 +43,13 @@ public class DAGStatus { KILLED, FAILED, ERROR, - }; + } DAGStatusProtoOrBuilder proxy = null; Progress progress = null; Map vertexProgress = null; + TezCounters dagCounters = null; + AtomicBoolean countersInitialized = new AtomicBoolean(false); public DAGStatus(DAGStatusProtoOrBuilder proxy) { this.proxy = proxy; @@ -123,13 +128,27 @@ public class DAGStatus { return vertexProgress; } + public TezCounters getDAGCounters() { + if (countersInitialized.get()) { + return dagCounters; + } + if (proxy.hasDagCounters()) { + dagCounters = DagTypeConverters.convertTezCountersFromProto( + proxy.getDagCounters()); + } + countersInitialized.set(true); + return dagCounters; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("status=" + getState() - + ", progress=" + getDAGProgress() - + ", diagnostics=" - + StringUtils.join(LINE_SEPARATOR, getDiagnostics())); + + ", progress=" + getDAGProgress() + + ", diagnostics=" + + StringUtils.join(LINE_SEPARATOR, getDiagnostics()) + + ", counters=" + + (dagCounters == null ? "null" : dagCounters.toString())); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java new file mode 100644 index 0000000..922ab24 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/StatusGetOpts.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api.client; + +/** + * Status Get Options used when making calls like getDAGStatus and + * getVertexStatus in DAGClient + */ +public enum StatusGetOpts { + /** Retrieve Counters with Status */ + GET_COUNTERS +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java index ce5dbe0..5ea190f 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/VertexStatus.java @@ -19,12 +19,15 @@ package org.apache.tez.dag.api.client; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProtoOrBuilder; import org.apache.tez.dag.api.TezUncheckedException; public class VertexStatus { - + public enum State { INITED, RUNNING, @@ -33,11 +36,13 @@ public class VertexStatus { FAILED, ERROR, TERMINATING, - }; - + } + VertexStatusProtoOrBuilder proxy = null; Progress progress = null; - + TezCounters vertexCounters = null; + private AtomicBoolean countersInitialized = new AtomicBoolean(false); + public VertexStatus(VertexStatusProtoOrBuilder proxy) { this.proxy = proxy; } @@ -59,9 +64,9 @@ public class VertexStatus { case VERTEX_TERMINATING: return VertexStatus.State.TERMINATING; default: - throw new TezUncheckedException("Unsupported value for VertexStatus.State : " + - proxy.getState()); - } + throw new TezUncheckedException( + "Unsupported value for VertexStatus.State : " + proxy.getState()); + } } public List getDiagnostics() { @@ -72,7 +77,29 @@ public class VertexStatus { if(progress == null && proxy.hasProgress()) { progress = new Progress(proxy.getProgress()); } - return progress; + return progress; + } + + public TezCounters getVertexCounters() { + if (countersInitialized.get()) { + return vertexCounters; + } + if (proxy.hasVertexCounters()) { + vertexCounters = DagTypeConverters.convertTezCountersFromProto( + proxy.getVertexCounters()); + } + countersInitialized.set(true); + return vertexCounters; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("status=" + getState() + + ", progress=" + getProgress() + + ", counters=" + + (vertexCounters == null ? "null" : vertexCounters.toString())); + return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java index dae5625..06cebca 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.api.client.rpc; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Collections; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,11 +34,13 @@ import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto; @@ -74,10 +77,11 @@ public class DAGClientRPCImpl implements DAGClient { } @Override - public DAGStatus getDAGStatus() throws IOException, TezException { + public DAGStatus getDAGStatus(Set statusOptions) + throws IOException, TezException { if(createAMProxyIfNeeded()) { try { - return getDAGStatusViaAM(); + return getDAGStatusViaAM(statusOptions); } catch (TezException e) { resetProxy(e); // create proxy again } @@ -88,11 +92,12 @@ public class DAGClientRPCImpl implements DAGClient { } @Override - public VertexStatus getVertexStatus(String vertexName) - throws IOException, TezException { + public VertexStatus getVertexStatus(String vertexName, + Set statusOptions) + throws IOException, TezException { if(createAMProxyIfNeeded()) { try { - return getVertexStatusViaAM(vertexName); + return getVertexStatusViaAM(vertexName, statusOptions); } catch (TezException e) { resetProxy(e); // create proxy again } @@ -102,6 +107,8 @@ public class DAGClientRPCImpl implements DAGClient { return null; } + + @Override public void tryKillDAG() throws TezException, IOException { if(LOG.isDebugEnabled()) { @@ -141,23 +148,30 @@ public class DAGClientRPCImpl implements DAGClient { proxy = null; } - DAGStatus getDAGStatusViaAM() throws IOException, TezException { + DAGStatus getDAGStatusViaAM(Set statusOptions) + throws IOException, TezException { if(LOG.isDebugEnabled()) { LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId); } - GetDAGStatusRequestProto requestProto = - GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build(); + GetDAGStatusRequestProto.Builder requestProtoBuilder = + GetDAGStatusRequestProto.newBuilder() + .setDagId(dagId); + + if (statusOptions != null) { + requestProtoBuilder.addAllStatusOptions( + DagTypeConverters.convertStatusGetOptsToProto(statusOptions)); + } + try { return new DAGStatus( - proxy.getDAGStatus(null, requestProto).getDagStatus()); + proxy.getDAGStatus(null, + requestProtoBuilder.build()).getDagStatus()); } catch (ServiceException e) { // TEZ-151 retrieve wrapped TezException throw new TezException(e); } } - - DAGStatus getDAGStatusViaRM() throws TezException, IOException { if(LOG.isDebugEnabled()) { LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId); @@ -175,7 +189,7 @@ public class DAGClientRPCImpl implements DAGClient { DAGStatusProto.Builder builder = DAGStatusProto.newBuilder(); DAGStatus dagStatus = new DAGStatus(builder); - DAGStatusStateProto dagState = null; + DAGStatusStateProto dagState; switch (appReport.getYarnApplicationState()) { case NEW: case NEW_SAVING: @@ -224,18 +238,27 @@ public class DAGClientRPCImpl implements DAGClient { return dagStatus; } - VertexStatus getVertexStatusViaAM(String vertexName) throws TezException { + VertexStatus getVertexStatusViaAM(String vertexName, + Set statusOptions) + throws TezException { if (LOG.isDebugEnabled()) { LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId + " vertex: " + vertexName); } - GetVertexStatusRequestProto requestProto = - GetVertexStatusRequestProto.newBuilder(). - setDagId(dagId).setVertexName(vertexName).build(); + GetVertexStatusRequestProto.Builder requestProtoBuilder = + GetVertexStatusRequestProto.newBuilder() + .setDagId(dagId) + .setVertexName(vertexName); + + if (statusOptions != null) { + requestProtoBuilder.addAllStatusOptions( + DagTypeConverters.convertStatusGetOptsToProto(statusOptions)); + } try { return new VertexStatus( - proxy.getVertexStatus(null, requestProto).getVertexStatus()); + proxy.getVertexStatus(null, + requestProtoBuilder.build()).getVertexStatus()); } catch (ServiceException e) { // TEZ-151 retrieve wrapped TezException throw new TezException(e); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/proto/DAGApiRecords.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto index b948e60..9ce51a1 100644 --- a/tez-api/src/main/proto/DAGApiRecords.proto +++ b/tez-api/src/main/proto/DAGApiRecords.proto @@ -22,12 +22,12 @@ option java_generate_equals_and_hash = true; // DAG plan messages -// Many of these types have a dual in the Tez-api. To reduce confusion, these types have prefix or suffix +// Many of these types have a dual in the Tez-api. To reduce confusion, these types have prefix or suffix // of "Plan" to indicate they are to be used in the dag-plan. -// The big types use a suffix: JobPlan, VertexPlan, EdgePlan +// The big types use a suffix: JobPlan, VertexPlan, EdgePlan // --> these get more direct use in the runtime and the naming is natural. // The enums and utility types use prefix: PlanVertexType, PlanEdgeConnectionPaatern, etc -// --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable. +// --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable. enum PlanVertexType { INPUT = 0; @@ -94,7 +94,7 @@ message PlanTaskConfiguration { required string javaOpts = 4; required string taskModule = 5; repeated PlanLocalResource localResource = 6; - repeated PlanKeyValuePair environmentSetting = 8; + repeated PlanKeyValuePair environmentSetting = 7; } message TezEntityDescriptorProto { @@ -113,11 +113,11 @@ message VertexPlan { required PlanVertexType type = 2; optional TezEntityDescriptorProto processor_descriptor = 3; required PlanTaskConfiguration taskConfig = 4; - repeated PlanTaskLocationHint taskLocationHint = 7; - repeated string inEdgeId = 8; - repeated string outEdgeId = 9; - repeated RootInputLeafOutputProto inputs = 10; - repeated RootInputLeafOutputProto outputs = 11; + repeated PlanTaskLocationHint taskLocationHint = 5; + repeated string inEdgeId = 6; + repeated string outEdgeId = 7; + repeated RootInputLeafOutputProto inputs = 8; + repeated RootInputLeafOutputProto outputs = 9; } message EdgePlan { @@ -165,6 +165,7 @@ message VertexStatusProto { optional VertexStatusStateProto state = 1; repeated string diagnostics = 2; optional ProgressProto progress = 3; + optional TezCountersProto vertexCounters = 4; } enum DAGStatusStateProto { @@ -187,9 +188,30 @@ message DAGStatusProto { optional DAGStatusStateProto state = 1; repeated string diagnostics = 2; optional ProgressProto DAGProgress = 3; - repeated StringProgressPairProto vertexProgress = 4; + repeated StringProgressPairProto vertexProgress = 4; + optional TezCountersProto dagCounters = 5; } message PlanLocalResourcesProto { repeated PlanLocalResource localResources = 1; } + +message TezCounterProto { + optional string name = 1; + optional string display_name = 2; + optional int64 value = 3; +} + +message TezCounterGroupProto { + optional string name = 1; + optional string display_name = 2; + repeated TezCounterProto counters = 3; +} + +message TezCountersProto { + repeated TezCounterGroupProto counter_groups = 1; +} + +enum StatusGetOptsProto { + GET_COUNTERS = 0; +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-api/src/main/proto/DAGClientAMProtocol.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/DAGClientAMProtocol.proto b/tez-api/src/main/proto/DAGClientAMProtocol.proto index 1236190..0f29364 100644 --- a/tez-api/src/main/proto/DAGClientAMProtocol.proto +++ b/tez-api/src/main/proto/DAGClientAMProtocol.proto @@ -34,6 +34,7 @@ message GetAllDAGsResponseProto { message GetDAGStatusRequestProto { optional string dagId = 1; + repeated StatusGetOptsProto statusOptions = 3; } message GetDAGStatusResponseProto { @@ -43,6 +44,7 @@ message GetDAGStatusResponseProto { message GetVertexStatusRequestProto { optional string dagId = 1; optional string vertexName = 2; + repeated StatusGetOptsProto statusOptions = 3; } message GetVertexStatusResponseProto { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java index 2b0f543..62b1399 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGStatusBuilder.java @@ -20,6 +20,8 @@ package org.apache.tez.dag.api.client; import java.util.List; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto; import org.apache.tez.dag.api.records.DAGProtos.StringProgressPairProto; import org.apache.tez.dag.api.TezUncheckedException; @@ -33,32 +35,37 @@ public class DAGStatusBuilder extends DAGStatus { public DAGStatusBuilder() { super(DAGStatusProto.newBuilder()); } - + public void setState(DAGState state) { getBuilder().setState(getProtoState(state)); } - + public void setDiagnostics(List diagnostics) { Builder builder = getBuilder(); builder.clearDiagnostics(); builder.addAllDiagnostics(diagnostics); } - + public void setDAGProgress(ProgressBuilder progress) { getBuilder().setDAGProgress(progress.getProto()); } - + + public void setDAGCounters(TezCounters counters) { + getBuilder().setDagCounters( + DagTypeConverters.convertTezCountersToProto(counters)); + } + public void addVertexProgress(String name, ProgressBuilder progress) { StringProgressPairProto.Builder builder = StringProgressPairProto.newBuilder(); builder.setKey(name); builder.setProgress(progress.getProto()); getBuilder().addVertexProgress(builder.build()); } - + public DAGStatusProto getProto() { return getBuilder().build(); } - + private DAGStatusStateProto getProtoState(DAGState state) { switch(state) { case NEW: @@ -80,7 +87,7 @@ public class DAGStatusBuilder extends DAGStatus { throw new TezUncheckedException("Unsupported value for DAGState : " + state); } } - + private DAGStatusProto.Builder getBuilder() { return (Builder) this.proxy; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java index 6cedb3f..99fcfa0 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/ProgressBuilder.java @@ -20,38 +20,37 @@ package org.apache.tez.dag.api.client; import org.apache.tez.dag.api.records.DAGProtos.ProgressProto; import org.apache.tez.dag.api.records.DAGProtos.ProgressProto.Builder; -import org.apache.tez.dag.api.client.Progress; public class ProgressBuilder extends Progress { public ProgressBuilder() { super(ProgressProto.newBuilder()); } - + public ProgressProto getProto() { return getBuilder().build(); } - + public void setTotalTaskCount(int count) { getBuilder().setTotalTaskCount(count); } - + public void setSucceededTaskCount(int count) { getBuilder().setSucceededTaskCount(count); } - + public void setRunningTaskCount(int count) { getBuilder().setRunningTaskCount(count); } - + public void setFailedTaskCount(int count) { getBuilder().setFailedTaskCount(count); } - + public void setKilledTaskCount(int count) { getBuilder().setKilledTaskCount(count); } - + private ProgressProto.Builder getBuilder() { return (Builder) this.proxy; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java index 66de71f..47bbb2c 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/VertexStatusBuilder.java @@ -20,6 +20,8 @@ package org.apache.tez.dag.api.client; import java.util.List; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProto; import org.apache.tez.dag.api.records.DAGProtos.VertexStatusProto.Builder; import org.apache.tez.dag.api.records.DAGProtos.VertexStatusStateProto; @@ -32,28 +34,34 @@ public class VertexStatusBuilder extends VertexStatus { public VertexStatusBuilder() { super(VertexStatusProto.newBuilder()); } - + public void setState(VertexState state) { getBuilder().setState(getProtoState(state)); } - + public void setDiagnostics(List diagnostics) { Builder builder = getBuilder(); builder.clearDiagnostics(); builder.addAllDiagnostics(diagnostics); } - + public void setProgress(ProgressBuilder progress) { getBuilder().setProgress(progress.getProto()); } - + + public void setVertexCounters(TezCounters counters) { + getBuilder().setVertexCounters( + DagTypeConverters.convertTezCountersToProto(counters)); + } + public VertexStatusProto getProto() { return getBuilder().build(); } - + private VertexStatusStateProto getProtoState(VertexState state) { switch(state) { case NEW: + case INITIALIZING: case INITED: return VertexStatusStateProto.VERTEX_INITED; case RUNNING: @@ -72,7 +80,7 @@ public class VertexStatusBuilder extends VertexStatus { throw new TezUncheckedException("Unsupported value for VertexState : " + state); } } - + private VertexStatusProto.Builder getBuilder() { return (Builder) this.proxy; } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java index eb1ff48..959fbbc 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java @@ -74,7 +74,9 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements try { String dagId = request.getDagId(); DAGStatus status; - status = real.getDAGStatus(dagId); + status = real.getDAGStatus(dagId, + DagTypeConverters.convertStatusGetOptsFromProto( + request.getStatusOptionsList())); assert status instanceof DAGStatusBuilder; DAGStatusBuilder builder = (DAGStatusBuilder) status; return GetDAGStatusResponseProto.newBuilder(). @@ -90,7 +92,9 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements try { String dagId = request.getDagId(); String vertexName = request.getVertexName(); - VertexStatus status = real.getVertexStatus(dagId, vertexName); + VertexStatus status = real.getVertexStatus(dagId, vertexName, + DagTypeConverters.convertStatusGetOptsFromProto( + request.getStatusOptionsList())); assert status instanceof VertexStatusBuilder; VertexStatusBuilder builder = (VertexStatusBuilder) status; return GetVertexStatusResponseProto.newBuilder(). http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 6d2ec19..10d05ff 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -32,6 +32,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; @@ -85,6 +86,7 @@ import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.client.DAGClientServer; import org.apache.tez.dag.api.client.DAGStatus; +import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; @@ -146,7 +148,7 @@ import org.apache.tez.runtime.library.common.security.TokenCache; @SuppressWarnings("rawtypes") public class DAGAppMaster extends AbstractService { - + private static final Log LOG = LogFactory.getLog(DAGAppMaster.class); /** @@ -474,7 +476,7 @@ public class DAGAppMaster extends AbstractService { protected DAG createDAG(DAGPlan dagPB) { TezDAGID dagId = new TezDAGID(appAttemptID.getApplicationId(), dagCounter.incrementAndGet()); - + // Prepare the TaskAttemptListener server for authentication of Containers // TaskAttemptListener gets the information via jobTokenSecretManager. String dagIdString = dagId.toString(); @@ -500,7 +502,7 @@ public class DAGAppMaster extends AbstractService { return newDag; } // end createDag() - + protected void addIfService(Object object, boolean addDispatcher) { if (object instanceof Service) { Service service = (Service) object; @@ -712,13 +714,17 @@ public class DAGAppMaster extends AbstractService { return Collections.singletonList(currentDAG.getID().toString()); } - public DAGStatus getDAGStatus(String dagIdStr) throws TezException { - return getDAG(dagIdStr).getDAGStatus(); + public DAGStatus getDAGStatus(String dagIdStr, + Set statusOptions) + throws TezException { + return getDAG(dagIdStr).getDAGStatus(statusOptions); } - public VertexStatus getVertexStatus(String dagIdStr, String vertexName) + public VertexStatus getVertexStatus(String dagIdStr, String vertexName, + Set statusOptions) throws TezException{ - VertexStatus status = getDAG(dagIdStr).getVertexStatus(vertexName); + VertexStatus status = getDAG(dagIdStr) + .getVertexStatus(vertexName, statusOptions); if(status == null) { throw new TezException("Unknown vertexName: " + vertexName); } @@ -1414,11 +1420,11 @@ public class DAGAppMaster extends AbstractService { appMaster.currentUser = UserGroupInformation.getCurrentUser(); Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); - + UserGroupInformation appMasterUgi = UserGroupInformation .createRemoteUser(jobUserName); appMasterUgi.addCredentials(credentials); - + // Now remove the AM->RM token so tasks don't have it Iterator> iter = credentials.getAllTokens().iterator(); while (iter.hasNext()) { @@ -1427,7 +1433,7 @@ public class DAGAppMaster extends AbstractService { iter.remove(); } } - + appMaster.tokens = credentials; appMasterUgi.doAs(new PrivilegedExceptionAction() { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java index ce1ee89..4e12603 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java @@ -20,11 +20,13 @@ package org.apache.tez.dag.app.dag; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.client.DAGStatusBuilder; import org.apache.tez.dag.api.client.VertexStatusBuilder; @@ -66,8 +68,9 @@ public interface DAG { Configuration getConf(); DAGPlan getJobPlan(); - DAGStatusBuilder getDAGStatus(); - VertexStatusBuilder getVertexStatus(String vertexName); + DAGStatusBuilder getDAGStatus(Set statusOptions); + VertexStatusBuilder getVertexStatus(String vertexName, + Set statusOptions); boolean isComplete(); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java index caab317..737091a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java @@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.tez.common.counters.TezCounters; @@ -28,6 +29,7 @@ import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.VertexLocationHint; +import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import org.apache.tez.dag.api.client.ProgressBuilder; @@ -68,7 +70,8 @@ public interface Vertex extends Comparable { int getRunningTasks(); float getProgress(); ProgressBuilder getVertexProgress(); - VertexStatusBuilder getVertexStatus(); + VertexStatusBuilder getVertexStatus(Set statusOptions); + void setParallelism(int parallelism, Map sourceEdgeManagers); void setVertexLocationHint(VertexLocationHint vertexLocationHint); @@ -79,13 +82,13 @@ public interface Vertex extends Comparable { Map getInputVertices(); Map getOutputVertices(); - + void setAdditionalInputs(List inputs); void setAdditionalOutputs(List outputs); - + Map> getAdditionalInputs(); Map> getAdditionalOutputs(); - + List getInputSpecList(int taskIndex); List getOutputSpecList(int taskIndex); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 0df7875..d16086b 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -56,6 +56,7 @@ import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.DAGStatusBuilder; import org.apache.tez.dag.api.client.ProgressBuilder; +import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatusBuilder; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; @@ -510,7 +511,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, // monitoring apis @Override - public DAGStatusBuilder getDAGStatus() { + public DAGStatusBuilder getDAGStatus(Set statusOptions) { DAGStatusBuilder status = new DAGStatusBuilder(); int totalTaskCount = 0; int totalSucceededTaskCount = 0; @@ -537,6 +538,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, status.setState(getState()); status.setDiagnostics(diagnostics); status.setDAGProgress(dagProgress); + if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) { + status.setDAGCounters(getAllCounters()); + } return status; } finally { readLock.unlock(); @@ -544,12 +548,13 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, } @Override - public VertexStatusBuilder getVertexStatus(String vertexName) { + public VertexStatusBuilder getVertexStatus(String vertexName, + Set statusOptions) { Vertex vertex = vertexMap.get(vertexName); if(vertex == null) { return null; } - return vertex.getVertexStatus(); + return vertex.getVertexStatus(statusOptions); } @@ -1032,10 +1037,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, /** * Set the terminationCause and send a kill-message to all vertices. * The vertex-kill messages are only sent once. - * @param the trigger that is causing the DAG to transition to KILLED/FAILED - * @param event The type of kill event to send to the vertices. */ - void enactKill(DAGTerminationCause dagTerminationCause, VertexTerminationCause vertexTerminationCause) { + void enactKill(DAGTerminationCause dagTerminationCause, + VertexTerminationCause vertexTerminationCause) { if(trySetTerminationCause(dagTerminationCause)){ for (Vertex v : vertices.values()) { @@ -1100,7 +1104,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, job.numCompletedVertices++; if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) { if (!job.reRunningVertices.contains(vertex.getVertexId())) { - // vertex succeeded for the first time + // vertex succeeded for the first time job.dagScheduler.vertexCompleted(vertex); } job.vertexSucceeded(vertex); @@ -1117,7 +1121,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, job.vertexKilled(vertex); forceTransitionToKillWait = true; } - + job.reRunningVertices.remove(vertex.getVertexId()); LOG.info("Vertex " + vertex.getVertexId() + " completed." http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java index 3d4703b..8a587c1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java @@ -109,11 +109,11 @@ public class TaskImpl implements Task, EventHandler { private long scheduledTime; protected TaskLocationHint locationHint; - + private List tezEventsForTaskAttempts = new ArrayList(); - private static final List EMPTY_TASK_ATTEMPT_TEZ_EVENTS = + private static final List EMPTY_TASK_ATTEMPT_TEZ_EVENTS = new ArrayList(0); - + // counts the number of attempts that are either running or in a state where // they will come to be running when they get a Container @@ -162,8 +162,8 @@ public class TaskImpl implements Task, EventHandler { .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED, TaskEventType.T_ADD_TEZ_EVENT, ADD_TEZ_EVENT_TRANSITION) - // When current attempt fails/killed and new attempt launched then - // TODO Task should go back to SCHEDULED state TEZ-495 + // When current attempt fails/killed and new attempt launched then + // TODO Task should go back to SCHEDULED state TEZ-495 // Transitions from RUNNING state .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later @@ -444,7 +444,7 @@ public class TaskImpl implements Task, EventHandler { readLock.unlock(); } } - + @Override public List getTaskAttemptTezEvents(TezTaskAttemptID attemptID, int fromEventId, int maxEvents) { @@ -464,39 +464,39 @@ public class TaskImpl implements Task, EventHandler { events = Collections.unmodifiableList(new ArrayList( tezEventsForTaskAttempts.subList(fromEventId, toEventId))); LOG.info("TaskAttempt:" + attemptID + " sent events: (" + fromEventId - + "-" + toEventId + ")"); - // currently not modifying the events so that we dont have to create + + "-" + toEventId + ")"); + // currently not modifying the events so that we dont have to create // copies of events. e.g. if we have to set taskAttemptId into the TezEvent // destination metadata then we will need to create a copy of the TezEvent - // and then modify the metadata and then send the copy on the RPC. This - // is important because TezEvents are only routed in the AM and not copied - // during routing. So e.g. a broadcast edge will send the same event to - // all consumers (like it should). If copies were created then re-routing - // the events on parallelism changes would be difficult. We would have to - // buffer the events in the Vertex until the parallelism was set and then + // and then modify the metadata and then send the copy on the RPC. This + // is important because TezEvents are only routed in the AM and not copied + // during routing. So e.g. a broadcast edge will send the same event to + // all consumers (like it should). If copies were created then re-routing + // the events on parallelism changes would be difficult. We would have to + // buffer the events in the Vertex until the parallelism was set and then // route the events. } return events; } finally { readLock.unlock(); - } + } } - - @Override + + @Override public List getAndClearTaskTezEvents() { readLock.lock(); try { List events = tezEventsForTaskAttempts; - tezEventsForTaskAttempts = new ArrayList(); + tezEventsForTaskAttempts = new ArrayList(); return events; } finally { readLock.unlock(); - } + } } @Override public List getDiagnostics() { - List diagnostics = new ArrayList(5); + List diagnostics = new ArrayList(attempts.size()); readLock.lock(); try { for (TaskAttempt att : attempts.values()) { @@ -616,7 +616,7 @@ public class TaskImpl implements Task, EventHandler { if (ta == null) { throw new TezUncheckedException("Unknown task for commit: " + taskAttemptID); } - // Its ok to get a non-locked state snapshot since we handle changes of + // Its ok to get a non-locked state snapshot since we handle changes of // state in the task attempt. Dont want to deadlock here. TaskAttemptState taState = ta.getStateNoLock(); if (taState == TaskAttemptState.RUNNING) { @@ -624,7 +624,7 @@ public class TaskImpl implements Task, EventHandler { LOG.info(taskAttemptID + " given a go for committing the task output."); return true; } else { - LOG.info(taskAttemptID + " with state: " + taState + + LOG.info(taskAttemptID + " with state: " + taState + " given a no-go for commit because its not running."); return false; } @@ -636,14 +636,14 @@ public class TaskImpl implements Task, EventHandler { // Don't think this can be a pluggable decision, so simply raise an // event for the TaskAttempt to delete its output. // Wait for commit attempt to succeed. Dont kill this. If commit - // attempt fails then choose a different committer. When commit attempt + // attempt fails then choose a different committer. When commit attempt // succeeds then this and others will be killed LOG.info(commitAttempt + " is current committer. Commit waiting for: " + taskAttemptID); return false; } - + } finally { writeLock.unlock(); } @@ -654,7 +654,7 @@ public class TaskImpl implements Task, EventHandler { public boolean needsWaitAfterOutputConsumable() { Vertex vertex = getVertex(); ProcessorDescriptor processorDescriptor = vertex.getProcessorDescriptor(); - if (processorDescriptor != null && + if (processorDescriptor != null && processorDescriptor.getClassName().contains("InitialTaskWithInMemSort")) { return true; } else { @@ -929,17 +929,17 @@ public class TaskImpl implements Task, EventHandler { @Override public void transition(TaskImpl task, TaskEvent event) { TezTaskAttemptID successTaId = ((TaskEventTAUpdate) event).getTaskAttemptID(); - - if (task.commitAttempt != null && + + if (task.commitAttempt != null && !task.commitAttempt.equals(successTaId)) { // The succeeded attempt is not the one that was selected to commit // This is impossible and has to be a bug - throw new TezUncheckedException("TA: " + successTaId - + " succeeded but TA: " + task.commitAttempt + throw new TezUncheckedException("TA: " + successTaId + + " succeeded but TA: " + task.commitAttempt + " was expected to commit and succeed"); } - - task.handleTaskAttemptCompletion(successTaId, + + task.handleTaskAttemptCompletion(successTaId, TaskAttemptStateInternal.SUCCEEDED); task.finishedAttempts++; --task.numberUncompletedAttempts; @@ -976,7 +976,7 @@ public class TaskImpl implements Task, EventHandler { @Override public void transition(TaskImpl task, TaskEvent event) { TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event; - if (task.commitAttempt !=null && + if (task.commitAttempt !=null && castEvent.getTaskAttemptID().equals(task.commitAttempt)) { task.commitAttempt = null; } @@ -1028,7 +1028,7 @@ public class TaskImpl implements Task, EventHandler { public TaskStateInternal transition(TaskImpl task, TaskEvent event) { task.failedAttempts++; TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event; - if (task.commitAttempt != null && + if (task.commitAttempt != null && castEvent.getTaskAttemptID().equals(task.commitAttempt)) { task.commitAttempt = null; } @@ -1096,17 +1096,17 @@ public class TaskImpl implements Task, EventHandler { // fails, we have to let AttemptFailedTransition.transition // believe that there's no redundancy. unSucceed(task); - + // fake values for code for super.transition ++task.numberUncompletedAttempts; task.finishedAttempts--; - TaskStateInternal returnState = super.transition(task, event); - + TaskStateInternal returnState = super.transition(task, event); + if (returnState == TaskStateInternal.SCHEDULED) { // tell the dag about the rescheduling - task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId)); + task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId)); } - + return returnState; } @@ -1184,8 +1184,8 @@ public class TaskImpl implements Task, EventHandler { logMsg)); } } - - private static class AddTezEventTransition + + private static class AddTezEventTransition implements SingleArcTransition { @Override public void transition(TaskImpl task, TaskEvent event) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index df99bb7..257984f 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -62,6 +62,7 @@ import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.api.VertexLocationHint; import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; import org.apache.tez.dag.api.client.ProgressBuilder; +import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.dag.api.client.VertexStatusBuilder; import org.apache.tez.dag.api.committer.NullVertexOutputCommitter; @@ -240,7 +241,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, .addTransition(VertexState.INITIALIZING, VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - + // Transitions from INITED state // SOURCE_VERTEX_STARTED - for srces which detemrine parallelism, they must complete before this vertex can start. .addTransition(VertexState.INITED, VertexState.INITED, @@ -318,8 +319,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexState.SUCCEEDED, VertexState.ERROR, VertexEventType.V_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - .addTransition(VertexState.SUCCEEDED, - EnumSet.of(VertexState.RUNNING, VertexState.FAILED), + .addTransition(VertexState.SUCCEEDED, + EnumSet.of(VertexState.RUNNING, VertexState.FAILED), VertexEventType.V_TASK_RESCHEDULED, new TaskRescheduledAfterVertexSuccessTransition()) @@ -329,7 +330,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, VertexEventType.V_TASK_ATTEMPT_COMPLETED, // after we are done reruns of source tasks should not affect // us. These reruns may be triggered by other consumer vertices. - // We should have been in RUNNING state if we had triggered the + // We should have been in RUNNING state if we had triggered the // reruns. VertexEventType.V_SOURCE_TASK_ATTEMPT_COMPLETED, // accumulate these in case we get restarted @@ -432,9 +433,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, List pendingReportedSrcCompletions = Lists.newLinkedList(); private RootInputInitializerRunner rootInputInitializer; - + private VertexScheduler vertexScheduler; - + private boolean parallelismSet = false; private VertexOutputCommitter committer; @@ -558,10 +559,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, try { // does it matter to create a duplicate list for efficiency // instead of traversing the map - // local assign to LinkedHashMap to ensure that sequential traversal + // local assign to LinkedHashMap to ensure that sequential traversal // assumption is satisfied LinkedHashMap taskList = tasks; - int i=0; + int i=0; for(Map.Entry entry : taskList.entrySet()) { if(taskIndex == i) { return entry.getValue(); @@ -681,13 +682,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } @Override - public VertexStatusBuilder getVertexStatus() { + public VertexStatusBuilder getVertexStatus( + Set statusOptions) { this.readLock.lock(); try { VertexStatusBuilder status = new VertexStatusBuilder(); status.setState(getInternalState()); status.setDiagnostics(diagnostics); status.setProgress(getVertexProgress()); + if (statusOptions.contains(StatusGetOpts.GET_COUNTERS)) { + status.setVertexCounters(getAllCounters()); + } return status; } finally { this.readLock.unlock(); @@ -752,7 +757,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } - // TODO Create InputReadyVertexManager that schedules when there is something + // TODO Create InputReadyVertexManager that schedules when there is something // to read and use that as default instead of ImmediateStart.TEZ-480 @Override public void scheduleTasks(Collection taskIDs) { @@ -775,7 +780,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, Preconditions.checkState(parallelismSet == false, "Parallelism can only be set dynamically once per vertex"); parallelismSet = true; - + // Input initializer expected to set parallelism. if (numTasks == -1) { Preconditions @@ -789,7 +794,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // INITIALIZING state. return; } - + if (parallelism >= numTasks) { // not that hard to support perhaps. but checking right now since there // is no use case for it and checking may catch other bugs. @@ -800,16 +805,16 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, LOG.info("Ingoring setParallelism to current value: " + parallelism); return; } - + // start buffering incoming events so that we can re-route existing events for (Edge edge : sourceVertices.values()) { edge.startEventBuffering(); } - + // Use a set since the same event may have been sent to multiple tasks // and we want to avoid duplicates Set pendingEvents = new HashSet(); - + LOG.info("Vertex " + getVertexId() + " parallelism set to " + parallelism); // assign to local variable of LinkedHashMap to make sure that changing // type of task causes compile error. We depend on LinkedHashMap for order @@ -842,27 +847,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, Vertex sourceVertex = entry.getKey(); EdgeManager edgeManager = entry.getValue(); Edge edge = sourceVertices.get(sourceVertex); - LOG.info("Replacing edge manager for source:" + LOG.info("Replacing edge manager for source:" + sourceVertex.getVertexId() + " destination: " + getVertexId()); edge.setEdgeManager(edgeManager); } } - + // Re-route all existing TezEvents according to new routing table - // At this point only events attributed to source task attempts can be - // re-routed. e.g. DataMovement or InputFailed events. + // At this point only events attributed to source task attempts can be + // re-routed. e.g. DataMovement or InputFailed events. // This assumption is fine for now since these tasks haven't been started. - // So they can only get events generated from source task attempts that + // So they can only get events generated from source task attempts that // have already been started. DAG dag = getDAG(); for(TezEvent event : pendingEvents) { TezVertexID sourceVertexId = event.getSourceInfo().getTaskAttemptID() - .getTaskID().getVertexID(); + .getTaskID().getVertexID(); Vertex sourceVertex = dag.getVertex(sourceVertexId); Edge sourceEdge = sourceVertices.get(sourceVertex); sourceEdge.sendTezEventToDestinationTasks(event); } - + // stop buffering events for (Edge edge : sourceVertices.values()) { edge.stopEventBuffering(); @@ -871,7 +876,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } finally { writeLock.unlock(); } - + } public void setVertexLocationHint(VertexLocationHint vertexLocationHint) { @@ -1104,7 +1109,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } return finalState; } - + VertexState finished(VertexState finalState) { return finished(finalState, null); } @@ -1114,7 +1119,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, // Answer: Do commit for every vertex // for now, only for leaf vertices // TODO TEZ-41 make commmitter type configurable per vertex - + if (!this.additionalOutputSpecs.isEmpty()) { committer = new MRVertexOutputCommitter(); } @@ -1138,7 +1143,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, return VertexState.INITED; } - + /** * If the number of tasks are greater than the configured value * throw an exception that will fail job initialization @@ -1146,7 +1151,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, private void checkTaskLimits() { // no code, for now } - + private void createTasks() { Configuration conf = this.conf; boolean useNullLocationHint = true; @@ -1181,7 +1186,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } - + public static class InitTransition implements MultipleArcTransition { @@ -1259,7 +1264,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } vertex.checkTaskLimits(); - + // Create tasks based on initial configuration, but don't start them yet. if (vertex.numTasks == -1) { LOG.info("Num tasks is -1. Expecting VertexManager/InputInitializers" @@ -1277,11 +1282,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, float waves = vertex.conf.getFloat( TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES, TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT); - + int numTasks = (int)((totalResource*waves)/taskResource); - - LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks - + " tasks. Headroom: " + totalResource + " Task Resource: " + + LOG.info("Vertex " + vertex.getVertexId() + " asking for " + numTasks + + " tasks. Headroom: " + totalResource + " Task Resource: " + taskResource + " waves: " + waves); vertex.rootInputInitializer = vertex.createRootInputInitializerRunner( @@ -1301,7 +1306,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, return vertex.initializeVertex(); } } // end of InitTransition - + @VisibleForTesting protected RootInputInitializerRunner createRootInputInitializerRunner( String dagName, String vertexName, TezVertexID vertexID, @@ -1309,7 +1314,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, return new RootInputInitializerRunner(dagName, vertexName, vertexID, eventHandler, numTasks); } - + public static class RootInputInitializedTransition implements MultipleArcTransition { @@ -1388,9 +1393,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.startTimeRequested = vertex.clock.getTime(); vertex.startSignalPending = true; } - + } - + public static class StartTransition implements SingleArcTransition { /** @@ -1409,10 +1414,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertexScheduler.onVertexStarted(pendingReportedSrcCompletions); pendingReportedSrcCompletions.clear(); logJobHistoryVertexStartedEvent(); - + // TODO: Metrics //job.metrics.runningJob(job); - + // default behavior is to start immediately. so send information about us // starting to downstream vertices. If the connections/structure of this // vertex is not fully defined yet then we could send this event later @@ -1557,7 +1562,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, + " attempt: " + completionEvent.getTaskAttemptId() + " with state: " + completionEvent.getTaskAttemptState() + " vertexState: " + vertex.getState()); - + if (TaskAttemptStateInternal.SUCCEEDED.equals(completionEvent .getTaskAttemptState())) { vertex.numSuccessSourceAttemptCompletions++; @@ -1675,25 +1680,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, vertex.eventHandler.handle(new DAGEventVertexReRunning(vertex.getVertexId())); return VertexState.RUNNING; } - + LOG.info(vertex.getVertexId() + " failed due to post-commit rescheduling of " + ((VertexEventTaskReschedule)event).getTaskID()); // terminate any running tasks vertex.enactKill(VertexTerminationCause.OWN_TASK_FAILURE, TaskTerminationCause.OWN_TASK_FAILURE); - // since the DAG thinks this vertex is completed it must be notified of + // since the DAG thinks this vertex is completed it must be notified of // an error vertex.eventHandler.handle(new DAGEvent(vertex.getDAGId(), DAGEventType.INTERNAL_ERROR)); return VertexState.FAILED; } } - + private void addDiagnostic(String diag) { diagnostics.add(diag); } - - private static boolean isEventFromVertex(Vertex vertex, + + private static boolean isEventFromVertex(Vertex vertex, EventMetaData sourceMeta) { if (!sourceMeta.getTaskVertexName().equals(vertex.getName())) { return false; @@ -1701,7 +1706,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, return true; } - private static void checkEventSourceMetadata(Vertex vertex, + private static void checkEventSourceMetadata(Vertex vertex, EventMetaData sourceMeta) { if (!isEventFromVertex(vertex, sourceMeta)) { throw new TezUncheckedException("Bad routing of event" @@ -1754,7 +1759,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName()); Edge destEdge = vertex.targetVertices.get(destVertex); if (destEdge == null) { - throw new TezUncheckedException("Bad destination vertex: " + + throw new TezUncheckedException("Bad destination vertex: " + sourceMeta.getEdgeVertexName() + " for event vertex: " + vertex.getVertexId()); } @@ -1766,7 +1771,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex( sourceMeta.getTaskVertexName())); if (srcEdge == null) { - throw new TezUncheckedException("Bad source vertex: " + + throw new TezUncheckedException("Bad source vertex: " + sourceMeta.getTaskVertexName() + " for destination vertex: " + vertex.getVertexId()); } @@ -1830,7 +1835,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, } } } - + private static class InternalErrorTransition implements SingleArcTransition { @Override @@ -1861,7 +1866,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, "For now, only a single root input can be specified on a Vertex"); this.additionalInputs = Maps.newHashMapWithExpectedSize(inputs.size()); for (RootInputLeafOutputProto input : inputs) { - + InputDescriptor id = DagTypeConverters .convertInputDescriptorFromDAGPlan(input.getEntityDescriptor()); @@ -1872,7 +1877,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, InputSpec inputSpec = new InputSpec(input.getName(), id, 0); additionalInputSpecs.add(inputSpec); } - + } @Override @@ -1880,7 +1885,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, LOG.info("setting additional outputs for vertex " + this.vertexName); this.additionalOutputs = Maps.newHashMapWithExpectedSize(outputs.size()); for (RootInputLeafOutputProto output : outputs) { - + OutputDescriptor od = DagTypeConverters .convertOutputDescriptorFromDAGPlan(output.getEntityDescriptor()); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java index 5299431..23f5c72 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java @@ -18,11 +18,20 @@ package org.apache.tez.mapreduce.examples; +import java.io.IOException; import java.text.DecimalFormat; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Set; import org.apache.hadoop.util.ProgramDriver; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.Progress; +import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.client.VertexStatus; import org.apache.tez.mapreduce.examples.terasort.TeraGen; import org.apache.tez.mapreduce.examples.terasort.TeraSort; import org.apache.tez.mapreduce.examples.terasort.TeraValidate; @@ -85,7 +94,17 @@ public class ExampleDriver { System.exit(exitCode); } - public static void printMRRDAGStatus(DAGStatus dagStatus) { + public static void printDAGStatus(DAGClient dagClient, String[] vertexNames) + throws IOException, TezException { + printDAGStatus(dagClient, vertexNames, false, false); + } + + public static void printDAGStatus(DAGClient dagClient, String[] vertexNames, + boolean displayDAGCounters, boolean displayVertexCounters) + throws IOException, TezException { + Set opts = EnumSet.of(StatusGetOpts.GET_COUNTERS); + DAGStatus dagStatus = dagClient.getDAGStatus( + (displayDAGCounters ? opts : null)); Progress progress = dagStatus.getDAGProgress(); double vProgressFloat = 0.0f; if (progress != null) { @@ -96,9 +115,10 @@ public class ExampleDriver { + (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) : formatter.format((double)(progress.getSucceededTaskCount()) /progress.getTotalTaskCount()))); - final String[] vNames = { "initialmap", "ivertex1", "finalreduce" }; - for (String vertexName : vNames) { - Progress vProgress = dagStatus.getVertexProgress().get(vertexName); + for (String vertexName : vertexNames) { + VertexStatus vStatus = dagClient.getVertexStatus(vertexName, + (displayVertexCounters ? opts : null)); + Progress vProgress = vStatus.getProgress(); if (vProgress != null) { vProgressFloat = 0.0f; if (vProgress.getTotalTaskCount() == 0) { @@ -113,6 +133,19 @@ public class ExampleDriver { : vertexName) + " Progress: " + formatter.format(vProgressFloat)); } + if (displayVertexCounters) { + TezCounters counters = vStatus.getVertexCounters(); + if (counters != null) { + System.out.println("Vertex Counters for " + vertexName + ": " + + counters); + } + } + } + } + if (displayDAGCounters) { + TezCounters counters = dagStatus.getDAGCounters(); + if (counters != null) { + System.out.println("DAG Counters: " + counters); } } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java index bd032e2..a73b3fc 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java @@ -93,7 +93,7 @@ public class FilterLinesByWord { String [] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); boolean generateSplitsInClient = false; - + SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser(); try { generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false); @@ -196,7 +196,7 @@ public class FilterLinesByWord { Map stage1Env = new HashMap(); MRHelpers.updateEnvironmentForMRTasks(stage1Conf, stage1Env, true); stage1Vertex.setTaskEnvironment(stage1Env); - + // Configure the Input for stage1 Class initializerClazz = generateSplitsInClient ? null : MRInputAMSplitGenerator.class; @@ -214,7 +214,7 @@ public class FilterLinesByWord { Map stage2Env = new HashMap(); MRHelpers.updateEnvironmentForMRTasks(stage2Conf, stage2Env, false); stage2Vertex.setTaskEnvironment(stage2Env); - + // Configure the Output for stage2 stage2Vertex.addOutput("MROutput", new OutputDescriptor(MROutput.class.getName()).setUserPayload(MRHelpers @@ -233,9 +233,10 @@ public class FilterLinesByWord { LOG.info("Submitted DAG to Tez Session"); DAGStatus dagStatus = null; + String[] vNames = { "stage1", "stage2" }; try { while (true) { - dagStatus = dagClient.getDAGStatus(); + dagStatus = dagClient.getDAGStatus(null); if(dagStatus.getState() == DAGStatus.State.RUNNING || dagStatus.getState() == DAGStatus.State.SUCCEEDED || dagStatus.getState() == DAGStatus.State.FAILED || @@ -252,13 +253,13 @@ public class FilterLinesByWord { while (dagStatus.getState() == DAGStatus.State.RUNNING) { try { - ExampleDriver.printMRRDAGStatus(dagStatus); + ExampleDriver.printDAGStatus(dagClient, vNames); try { Thread.sleep(1000); } catch (InterruptedException e) { // continue; } - dagStatus = dagClient.getDAGStatus(); + dagStatus = dagClient.getDAGStatus(null); } catch (TezException e) { LOG.fatal("Failed to get application progress. Exiting"); System.exit(-1); @@ -269,24 +270,24 @@ public class FilterLinesByWord { tezSession.stop(); } - ExampleDriver.printMRRDAGStatus(dagStatus); + ExampleDriver.printDAGStatus(dagClient, vNames); LOG.info("Application completed. " + "FinalState=" + dagStatus.getState()); System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1); } - + public static class TextLongPair implements Writable { private Text text; private LongWritable longWritable; - + public TextLongPair() { } - + public TextLongPair(Text text, LongWritable longWritable) { this.text = text; this.longWritable = longWritable; } - + @Override public void write(DataOutput out) throws IOException { this.text.write(out); @@ -300,10 +301,10 @@ public class FilterLinesByWord { text.readFields(in); longWritable.readFields(in); } - + @Override public String toString() { return text.toString() + "\t" + longWritable.get(); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java index 29d6db5..60ce3da 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/GroupByOrderByMRRTest.java @@ -197,9 +197,10 @@ public class GroupByOrderByMRRTest { ApplicationId appId = TypeConverter.toYarn(jobId).getAppId(); DAGClient dagClient = tezClient.getDAGClient(appId); - DAGStatus dagStatus = null; + DAGStatus dagStatus; + String[] vNames = { "initialmap" , "ireduce1" , "finalreduce" }; while (true) { - dagStatus = dagClient.getDAGStatus(); + dagStatus = dagClient.getDAGStatus(null); if(dagStatus.getState() == DAGStatus.State.RUNNING || dagStatus.getState() == DAGStatus.State.SUCCEEDED || dagStatus.getState() == DAGStatus.State.FAILED || @@ -216,20 +217,20 @@ public class GroupByOrderByMRRTest { while (dagStatus.getState() == DAGStatus.State.RUNNING) { try { - ExampleDriver.printMRRDAGStatus(dagStatus); + ExampleDriver.printDAGStatus(dagClient, vNames); try { Thread.sleep(1000); } catch (InterruptedException e) { // continue; } - dagStatus = dagClient.getDAGStatus(); + dagStatus = dagClient.getDAGStatus(null); } catch (TezException e) { LOG.fatal("Failed to get application progress. Exiting"); System.exit(-1); } } - ExampleDriver.printMRRDAGStatus(dagStatus); + ExampleDriver.printDAGStatus(dagClient, vNames); LOG.info("Application completed. " + "FinalState=" + dagStatus.getState()); System.exit(dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6fddbd01/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java index 8943dfa..edea15b 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java @@ -834,7 +834,7 @@ public class MRRSleepJob extends Configured implements Tool { tezClient.submitDAGApplication(appId, dag, amConfig); while (true) { - DAGStatus status = dagClient.getDAGStatus(); + DAGStatus status = dagClient.getDAGStatus(null); LOG.info("DAG Status: " + status); if (status.isCompleted()) { break;