Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A043818AF9 for ; Tue, 29 Sep 2015 20:16:37 +0000 (UTC) Received: (qmail 73180 invoked by uid 500); 29 Sep 2015 20:16:37 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 73144 invoked by uid 500); 29 Sep 2015 20:16:37 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 73134 invoked by uid 99); 29 Sep 2015 20:16:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Sep 2015 20:16:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C0F3DFCCE; Tue, 29 Sep 2015 20:16:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hitesh@apache.org To: commits@tez.apache.org Message-Id: <0e702f3a38c144a3b4bd13a887c63c33@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: tez git commit: TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. (hitesh) Date: Tue, 29 Sep 2015 20:16:37 +0000 (UTC) Repository: tez Updated Branches: refs/heads/branch-0.6 9aaea3a0c -> aa4806544 TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. (hitesh) (cherry picked from commit 774444312f8ea586939cb85c140c94251162e731) Conflicts: CHANGES.txt tez-api/src/main/java/org/apache/tez/dag/api/DAG.java tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java tez-api/src/main/proto/DAGApiRecords.proto tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestHistoryParser.java (cherry picked from commit 1709e46cec0d954dfbea86208a7fd9bcd56b4523) Conflicts: CHANGES.txt tez-api/src/main/java/org/apache/tez/dag/api/DAG.java tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aa480654 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aa480654 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aa480654 Branch: refs/heads/branch-0.6 Commit: aa4806544cf8111575329c7a23eb8d8a8e4b5bd3 Parents: 9aaea3a Author: Hitesh Shah Authored: Tue Sep 29 11:45:36 2015 -0700 Committer: Hitesh Shah Committed: Tue Sep 29 13:16:19 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/client/CallerContext.java | 171 +++++++++++++++++++ .../org/apache/tez/common/ATSConstants.java | 10 +- .../java/org/apache/tez/common/TezUtils.java | 2 +- .../main/java/org/apache/tez/dag/api/DAG.java | 21 ++- .../apache/tez/dag/api/DagTypeConverters.java | 28 ++- tez-api/src/main/proto/DAGApiRecords.proto | 8 + .../java/org/apache/tez/dag/api/TestDAG.java | 25 +++ .../org/apache/tez/common/TestTezUtils.java | 12 +- .../org/apache/tez/dag/app/DAGAppMaster.java | 10 +- .../impl/HistoryEventJsonConversion.java | 17 ++ .../apache/tez/dag/history/utils/DAGUtils.java | 24 ++- .../tez/dag/history/utils/TestDAGUtils.java | 13 +- .../apache/tez/examples/OrderedWordCount.java | 1 + .../org/apache/tez/examples/TezExampleBase.java | 11 ++ .../ats/HistoryEventTimelineConversion.java | 19 ++- .../ats/TestHistoryEventTimelineConversion.java | 29 +++- .../examples/TestOrderedWordCount.java | 3 +- 18 files changed, 386 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ea7ace5..e0b76ae 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.6.3: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2851. Support a way for upstream applications to pass in a caller context to Tez. TEZ-2398. Flaky test: TestFaultTolerance TEZ-2808. Race condition between preemption and container assignment TEZ-2203. Intern strings in tez counters http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/java/org/apache/tez/client/CallerContext.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/client/CallerContext.java b/tez-api/src/main/java/org/apache/tez/client/CallerContext.java new file mode 100644 index 0000000..ba68851 --- /dev/null +++ b/tez-api/src/main/java/org/apache/tez/client/CallerContext.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client; + +import javax.annotation.Nullable; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; + +import com.google.common.base.Preconditions; + +@Public +@Unstable +public class CallerContext { + + /** + * Context in which Tez is being invoked. + * For example, HIVE or PIG. + */ + private String context; + + /** + * Type of the caller. Should ideally be used along with callerId to uniquely identify the caller. + * When used with YARN Timeline, this should map to the Timeline Entity Type. + * For example, HIVE_QUERY_ID. + */ + private String callerType; + + /** + * Caller ID. + * An ID to uniquely identify the caller within the callerType namespace + */ + private String callerId; + + /** + * Free-form text or a json-representation of relevant meta-data. + * This can be used to describe the work being done. For example, for Hive, + * this could be the Hive query text. + */ + private String blob; + + /** + * Private Constructor + */ + private CallerContext() { + } + + /** + * Instantiate the Caller Context + * @param context Context in which Tez is being invoked. For example, HIVE or PIG. + * @param callerId Caller ID. An ID to uniquely identifier the caller within the callerType + * namespace + * @param callerType Type of the caller. Should ideally be used along with callerId to uniquely + * identify the caller. When used with YARN Timeline, this should map to + * the Timeline Entity Type. For example, HIVE_QUERY_ID. + * @param blob Free-form text or a json-representation of relevant meta-data. + * This can be used to describe the work being done. For example, for Hive, + * this could be the Hive query text. + * @return CallerContext + */ + public static CallerContext create(String context, String callerId, + String callerType, @Nullable String blob) { + return new CallerContext(context, callerId, callerType, blob); + } + + /** + * Instantiate the Caller Context + * @param context Context in which Tez is being invoked. For example, HIVE or PIG. + * @param blob Free-form text or a json-representation of relevant meta-data. + * This can be used to describe the work being done. For example, for Hive, + * this could be the Hive query text. + * @return CallerContext + */ + @Private + public static CallerContext create(String context, @Nullable String blob) { + return new CallerContext(context, blob); + } + + + private CallerContext(String context, String callerId, String callerType, + @Nullable String blob) { + if (callerId != null || callerType != null) { + setCallerIdAndType(callerId, callerType); + } + setContext(context); + setBlob(blob); + } + + private CallerContext(String context, @Nullable String blob) { + setContext(context); + setBlob(blob); + } + + public String getCallerType() { + return callerType; + } + + public String getCallerId() { + return callerId; + } + + public String getBlob() { + return blob; + } + + public String getContext() { + return context; + } + + /** + * @param context Context in which Tez is being invoked. For example, HIVE or PIG. + */ + public CallerContext setContext(String context) { + Preconditions.checkArgument(context != null && !context.isEmpty(), + "Context cannot be null or empty"); + this.context = context; + return this; + } + + /** + * @param callerId Caller ID. An ID to uniquely identifier the caller within the callerType + * namespace + * @param callerType Type of the caller. Should ideally be used along with callerId to uniquely + * identify the caller. When used with YARN Timeline, this should map to + * the Timeline Entity Type. For example, HIVE_QUERY_ID. + */ + public CallerContext setCallerIdAndType(String callerId, String callerType) { + Preconditions.checkArgument(callerType != null && !callerType.isEmpty() + && callerId != null && !callerId.isEmpty(), + "Caller Id and Caller Type cannot be null or empty"); + this.callerType = callerType; + this.callerId = callerId; + return this; + } + + /** + * @param blob Free-form text or a json-representation of relevant meta-data. + * This can be used to describe the work being done. For example, for Hive, + * this could be the Hive query text. + */ + public CallerContext setBlob(@Nullable String blob) { + this.blob = blob; + return this; + } + + @Override + public String toString() { + return "context=" + context + + ", callerType=" + callerType + + ", callerId=" + callerId + + ", blob=" + blob; + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java index fd82e20..85a669f 100644 --- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java +++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java @@ -45,6 +45,8 @@ public class ATSConstants { public static final String NODE_ID = "nodeId"; public static final String NODE_HTTP_ADDRESS = "nodeHttpAddress"; public static final String USER = "user"; + public static final String CALLER_CONTEXT_ID = "callerId"; + public static final String CALLER_CONTEXT_TYPE = "callerType"; /* Keys used in other info */ public static final String APP_SUBMIT_TIME = "appSubmitTime"; @@ -103,7 +105,7 @@ public class ATSConstants { "yarn.timeline-service.webapp.https.address"; /* History text related Keys */ - public static final String DESCRIPTION = "desc"; + public static final String DESC = "desc"; public static final String CONFIG = "config"; public static final String TEZ_VERSION = "tezVersion"; @@ -111,4 +113,10 @@ public class ATSConstants { public static final String REVISION = "revision"; public static final String BUILD_TIME = "buildTime"; + /* Caller Context Related Keys */ + public static final String CONTEXT = "context"; + public static final String CALLER_ID = "callerId"; + public static final String CALLER_TYPE = "callerType"; + public static final String DESCRIPTION = "description"; + } http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/java/org/apache/tez/common/TezUtils.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java index 1cbfbe0..41f3aa1 100644 --- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java +++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java @@ -162,7 +162,7 @@ public class TezUtils { JSONObject jsonObject = new JSONObject(); try { if (description != null && !description.isEmpty()) { - jsonObject.put(ATSConstants.DESCRIPTION, description); + jsonObject.put(ATSConstants.DESC, description); } if (conf != null) { JSONObject confJson = new JSONObject(); http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java index db2cb39..9398fc7 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -34,6 +34,7 @@ import org.apache.commons.collections4.BidiMap; import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tez.client.CallerContext; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.conf.Configuration; @@ -60,9 +61,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType; import org.apache.tez.dag.api.records.DAGProtos.VertexPlan; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -92,6 +91,7 @@ public class DAG { private DAGAccessControls dagAccessControls; Map commonTaskLocalFiles = Maps.newHashMap(); String dagInfo; + CallerContext callerContext; private Stack topologicalVertexStack = new Stack(); @@ -165,12 +165,25 @@ public class DAG { * In the case of Hive, this could be the SQL query text. * @return {@link DAG} */ + @Deprecated public synchronized DAG setDAGInfo(String dagInfo) { Preconditions.checkNotNull(dagInfo); this.dagInfo = dagInfo; return this; } + + /** + * Set the Context in which Tez is being called. + * @param callerContext Caller Context + * @return {@link DAG} + */ + public synchronized DAG setCallerContext(CallerContext callerContext) { + Preconditions.checkNotNull(callerContext); + this.callerContext = callerContext; + return this; + } + /** * Create a group of vertices that share a common output. This can be used to implement * unions efficiently. @@ -701,6 +714,10 @@ public class DAG { DAGPlan.Builder dagBuilder = DAGPlan.newBuilder(); dagBuilder.setName(this.name); + + if (this.callerContext != null) { + dagBuilder.setCallerContext(DagTypeConverters.convertCallerContextToProto(callerContext)); + } if (this.dagInfo != null && !this.dagInfo.isEmpty()) { dagBuilder.setDagInfo(this.dagInfo); } http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java index 6c3fd0d..1a80b3e 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezAppMasterStatus; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.counters.CounterGroup; @@ -55,6 +56,7 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType; import org.apache.tez.dag.api.client.StatusGetOpts; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TezSessionStatusProto; import org.apache.tez.dag.api.records.DAGProtos; +import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.EdgePlan; import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType; @@ -476,7 +478,7 @@ public class DagTypeConverters { PlanLocalResourcesProto.newBuilder(); for (Map.Entry entry : localResources.entrySet()) { PlanLocalResource plr = convertLocalResourceToPlanLocalResource( - entry.getKey(), entry.getValue()); + entry.getKey(), entry.getValue()); builder.addLocalResources(plr); } return builder.build(); @@ -686,4 +688,28 @@ public class DagTypeConverters { return payload.getPayload(); } + public static CallerContextProto convertCallerContextToProto(CallerContext callerContext) { + CallerContextProto.Builder callerContextBuilder = CallerContextProto.newBuilder(); + callerContextBuilder.setContext(callerContext.getContext()); + if (callerContext.getCallerId() != null) { + callerContextBuilder.setCallerId(callerContext.getCallerId()); + } + if (callerContext.getCallerType() != null) { + callerContextBuilder.setCallerType(callerContext.getCallerType()); + } + if (callerContext.getBlob() != null) { + callerContextBuilder.setBlob(callerContext.getBlob()); + } + return callerContextBuilder.build(); + } + + public static CallerContext convertCallerContextFromProto(CallerContextProto proto) { + CallerContext callerContext = CallerContext.create(proto.getContext(), + (proto.hasBlob() ? proto.getBlob() : null)); + if (proto.hasCallerType() && proto.hasCallerId()) { + callerContext.setCallerIdAndType(proto.getCallerId(), proto.getCallerType()); + } + return callerContext; + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/main/proto/DAGApiRecords.proto ---------------------------------------------------------------------- diff --git a/tez-api/src/main/proto/DAGApiRecords.proto b/tez-api/src/main/proto/DAGApiRecords.proto index 0539405..b71e804 100644 --- a/tez-api/src/main/proto/DAGApiRecords.proto +++ b/tez-api/src/main/proto/DAGApiRecords.proto @@ -156,6 +156,13 @@ message ConfigurationProto { repeated PlanKeyValuePair confKeyValues = 1; } +message CallerContextProto { + optional string context = 1; + optional string callerType = 2; + optional string callerId = 3; + optional string blob = 4; +} + message DAGPlan { required string name = 1; repeated VertexPlan vertex = 2; @@ -165,6 +172,7 @@ message DAGPlan { repeated PlanVertexGroupInfo vertex_groups = 6; repeated PlanLocalResource local_resource = 7; optional string dag_info = 8; + optional CallerContextProto caller_context = 10; } // DAG monitoring messages http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java index 31ced71..92c4db4 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java @@ -19,6 +19,7 @@ package org.apache.tez.dag.api; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.client.CallerContext; import org.junit.Assert; import org.junit.Test; @@ -166,4 +167,28 @@ public class TestDAG { Assert.assertEquals("Duplicated output:output_1, vertexName=v1", e.getMessage()); } } + + @Test + public void testCallerContext() { + DAG dag = DAG.create("dag1"); + try { + CallerContext callerContext = CallerContext.create("ctxt", "", "", "desc"); + Assert.fail("Expected failure for invalid args"); + } catch (Exception e) { + // Expected + } + try { + CallerContext callerContext = CallerContext.create("", "desc"); + Assert.fail("Expected failure for invalid args"); + } catch (Exception e) { + // Expected + } + + CallerContext callerContext; + callerContext = CallerContext.create("ctxt", "a", "a", "desc"); + callerContext = CallerContext.create("ctxt", "desc"); + callerContext = CallerContext.create("ctxt", null); + + } + } http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java index d39c47f..c88fa67 100644 --- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java +++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java @@ -168,7 +168,7 @@ public class TestTezUtils { JSONObject jsonObject = new JSONObject(confToJson); - Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION)); + Assert.assertFalse(jsonObject.has(ATSConstants.DESC)); Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG)); JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG); @@ -178,8 +178,8 @@ public class TestTezUtils { confToJson = TezUtils.convertToHistoryText(desc, conf); jsonObject = new JSONObject(confToJson); - Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION)); - String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION); + Assert.assertTrue(jsonObject.has(ATSConstants.DESC)); + String descFromJson = jsonObject.getString(ATSConstants.DESC); Assert.assertEquals(desc, descFromJson); Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG)); @@ -201,7 +201,7 @@ public class TestTezUtils { JSONObject jsonObject = new JSONObject(confToJson); - Assert.assertFalse(jsonObject.has(ATSConstants.DESCRIPTION)); + Assert.assertFalse(jsonObject.has(ATSConstants.DESC)); Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG)); JSONObject confObject = jsonObject.getJSONObject(ATSConstants.CONFIG); @@ -213,8 +213,8 @@ public class TestTezUtils { confToJson = TezUtils.convertToHistoryText(desc, conf); jsonObject = new JSONObject(confToJson); - Assert.assertTrue(jsonObject.has(ATSConstants.DESCRIPTION)); - String descFromJson = jsonObject.getString(ATSConstants.DESCRIPTION); + Assert.assertTrue(jsonObject.has(ATSConstants.DESC)); + String descFromJson = jsonObject.getString(ATSConstants.DESC); Assert.assertEquals(desc, descFromJson); Assert.assertTrue(jsonObject.has(ATSConstants.CONFIG)); http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index cb77d6e..b4323ad 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -59,6 +59,7 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.Options; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tez.client.CallerContext; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -2071,7 +2072,14 @@ public class DAGAppMaster extends AbstractService { cumulativeAdditionalResources.putAll(lrDiff); } - LOG.info("Running DAG: " + dagPlan.getName()); + String callerContextStr = ""; + if (dagPlan.hasCallerContext()) { + CallerContext callerContext = DagTypeConverters.convertCallerContextFromProto( + dagPlan.getCallerContext()); + callerContextStr = ", callerContext=" + callerContext.toString(); + } + LOG.info("Running DAG: " + dagPlan.getName() + callerContextStr); + String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); System.err.println(timeStamp + " Running Dag: "+ newDAG.getID()); System.out.println(timeStamp + " Running Dag: "+ newDAG.getID()); http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java index 8d7344f..22687f1 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java @@ -484,6 +484,15 @@ public class HistoryEventJsonConversion { JSONObject primaryFilters = new JSONObject(); primaryFilters.put(ATSConstants.DAG_NAME, event.getDAGName()); + if (event.getDAGPlan().hasCallerContext() + && event.getDAGPlan().getCallerContext().hasCallerId() + && event.getDAGPlan().getCallerContext().hasCallerType()) { + primaryFilters.put(ATSConstants.CALLER_CONTEXT_ID, + event.getDAGPlan().getCallerContext().getCallerId()); + primaryFilters.put(ATSConstants.CALLER_CONTEXT_TYPE, + event.getDAGPlan().getCallerContext().getCallerType()); + } + jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters); // TODO decide whether this goes into different events, @@ -500,6 +509,14 @@ public class HistoryEventJsonConversion { JSONObject otherInfo = new JSONObject(); otherInfo.put(ATSConstants.DAG_PLAN, DAGUtils.generateSimpleJSONPlan(event.getDAGPlan())); + if (event.getDAGPlan().hasCallerContext() + && event.getDAGPlan().getCallerContext().hasCallerId() + && event.getDAGPlan().getCallerContext().hasCallerType()) { + otherInfo.put(ATSConstants.CALLER_CONTEXT_ID, + event.getDAGPlan().getCallerContext().getCallerId()); + otherInfo.put(ATSConstants.CALLER_CONTEXT_TYPE, + event.getDAGPlan().getCallerContext().getCallerType()); + } jsonObject.put(ATSConstants.OTHER_INFO, otherInfo); return jsonObject; http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java index 339c65c..ea43d48 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java @@ -47,10 +47,13 @@ import org.apache.tez.dag.records.TezVertexID; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; +import com.google.common.base.Preconditions; + public class DAGUtils { public static final String DAG_NAME_KEY = "dagName"; public static final String DAG_INFO_KEY = "dagInfo"; + public static final String DAG_CONTEXT_KEY = "dagContext"; public static final String VERTICES_KEY = "vertices"; public static final String EDGES_KEY = "edges"; public static final String VERTEX_GROUPS_KEY = "vertexGroups"; @@ -142,15 +145,34 @@ public class DAGUtils { return object; } + static Map createDagInfoMap(DAGPlan dagPlan) { + Preconditions.checkArgument(dagPlan.hasCallerContext()); + Map dagInfo = new TreeMap(); + dagInfo.put(ATSConstants.CONTEXT, dagPlan.getCallerContext().getContext()); + if (dagPlan.getCallerContext().hasCallerId()) { + dagInfo.put(ATSConstants.CALLER_ID, dagPlan.getCallerContext().getCallerId()); + } + if (dagPlan.getCallerContext().hasCallerType()) { + dagInfo.put(ATSConstants.CALLER_TYPE, dagPlan.getCallerContext().getCallerType()); + } + if (dagPlan.getCallerContext().hasBlob()) { + dagInfo.put(ATSConstants.DESCRIPTION, dagPlan.getCallerContext().getBlob()); + } + return dagInfo; + } + public static Map convertDAGPlanToATSMap(DAGPlan dagPlan) throws IOException { final String VERSION_KEY = "version"; - final int version = 1; + final int version = 2; Map dagMap = new LinkedHashMap(); dagMap.put(DAG_NAME_KEY, dagPlan.getName()); if (dagPlan.hasDagInfo()) { dagMap.put(DAG_INFO_KEY, dagPlan.getDagInfo()); } + if (dagPlan.hasCallerContext()) { + dagMap.put(DAG_CONTEXT_KEY, createDagInfoMap(dagPlan)); + } dagMap.put(VERSION_KEY, version); ArrayList verticesList = new ArrayList(); for (DAGProtos.VertexPlan vertexPlan : dagPlan.getVertexList()) { http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java index cb7e0c8..4d4577a 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/history/utils/TestDAGUtils.java @@ -28,6 +28,8 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.tez.client.CallerContext; +import org.apache.tez.common.ATSConstants; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; @@ -53,6 +55,7 @@ import com.google.common.collect.Sets; public class TestDAGUtils { + @SuppressWarnings("deprecation") private DAGPlan createDAG() { // Create a plan with 3 vertices: A, B, C. Group(A,B)->C Configuration conf = new Configuration(false); @@ -71,6 +74,7 @@ public class TestDAGUtils { dummyTaskCount, dummyTaskResource); DAG dag = DAG.create("testDag"); + dag.setCallerContext(CallerContext.create("context1", "callerId1", "callerType1", "desc1")); dag.setDAGInfo("dagInfo"); String groupName1 = "uv12"; org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); @@ -113,10 +117,17 @@ public class TestDAGUtils { Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_NAME_KEY)); Assert.assertEquals("testDag", atsMap.get(DAGUtils.DAG_NAME_KEY)); Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_INFO_KEY)); + Assert.assertTrue(atsMap.containsKey(DAGUtils.DAG_CONTEXT_KEY)); + Map contextMap = (Map)atsMap.get(DAGUtils.DAG_CONTEXT_KEY); + Assert.assertEquals("context1", contextMap.get(ATSConstants.CONTEXT)); + Assert.assertEquals("callerId1", contextMap.get(ATSConstants.CALLER_ID)); + Assert.assertEquals("callerType1", contextMap.get(ATSConstants.CALLER_TYPE)); + Assert.assertEquals("desc1", contextMap.get(ATSConstants.DESCRIPTION)); + Assert.assertEquals("dagInfo", atsMap.get(DAGUtils.DAG_INFO_KEY)); Assert.assertEquals(dagPlan.getName(), atsMap.get(DAGUtils.DAG_NAME_KEY)); Assert.assertTrue(atsMap.containsKey("version")); - Assert.assertEquals(1, atsMap.get("version")); + Assert.assertEquals(2, atsMap.get("version")); Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTICES_KEY)); Assert.assertTrue(atsMap.containsKey(DAGUtils.EDGES_KEY)); Assert.assertTrue(atsMap.containsKey(DAGUtils.VERTEX_GROUPS_KEY)); http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java index 9cf21d3..e9e1e6b 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java index 4bad009..1c02cde 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java @@ -24,6 +24,9 @@ import java.util.Set; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.CallerContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -81,6 +84,14 @@ public abstract class TezExampleBase extends Configured implements Tool { public int runDag(DAG dag, boolean printCounters, Log logger) throws TezException, InterruptedException, IOException { tezClientInternal.waitTillReady(); + + CallerContext callerContext = CallerContext.create("TezExamples", + "Tez Example DAG: " + dag.getName()); + ApplicationId appId = tezClientInternal.getAppMasterApplicationId(); + if (appId != null) { + callerContext.setCallerIdAndType(appId.toString(), "TezExampleApplication"); + } + DAGClient dagClient = tezClientInternal.submitDAG(dag); Set getOpts = Sets.newHashSet(); if (printCounters) { http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java index 4c53337..e1b4d7e 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java @@ -246,7 +246,7 @@ public class HistoryEventTimelineConversion { event.getApplicationAttemptId().getApplicationId().toString()); atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, - event.getContainerId().toString()); + event.getContainerId().toString()); atsEntity.setStartTime(event.getLaunchTime()); TimelineEvent launchEvt = new TimelineEvent(); @@ -389,6 +389,15 @@ public class HistoryEventTimelineConversion { atsEntity.addPrimaryFilter(ATSConstants.APPLICATION_ID, event.getDagID().getApplicationId().toString()); + if (event.getDAGPlan().hasCallerContext() + && event.getDAGPlan().getCallerContext().hasCallerId() + && event.getDAGPlan().getCallerContext().hasCallerType()) { + atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_ID, + event.getDAGPlan().getCallerContext().getCallerId()); + atsEntity.addPrimaryFilter(ATSConstants.CALLER_CONTEXT_TYPE, + event.getDAGPlan().getCallerContext().getCallerType()); + } + try { atsEntity.addOtherInfo(ATSConstants.DAG_PLAN, DAGUtils.convertDAGPlanToATSMap(event.getDAGPlan())); @@ -402,6 +411,14 @@ public class HistoryEventTimelineConversion { atsEntity.addOtherInfo(ATSConstants.USER, event.getUser()); atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL + "_" + event.getApplicationAttemptId().getAttemptId(), event.getContainerLogs()); + if (event.getDAGPlan().hasCallerContext() + && event.getDAGPlan().getCallerContext().hasCallerId() + && event.getDAGPlan().getCallerContext().hasCallerType()) { + atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_ID, + event.getDAGPlan().getCallerContext().getCallerId()); + atsEntity.addOtherInfo(ATSConstants.CALLER_CONTEXT_TYPE, + event.getDAGPlan().getCallerContext().getCallerType()); + } return atsEntity; } http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java index 436d103..e1835b3 100644 --- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java +++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java @@ -30,12 +30,14 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.tez.client.CallerContext; import org.apache.tez.common.ATSConstants; import org.apache.tez.common.VersionInfo; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.oldrecords.TaskAttemptState; import org.apache.tez.dag.api.oldrecords.TaskState; +import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto; import org.apache.tez.dag.api.records.DAGProtos.DAGPlan; import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.VertexState; @@ -101,7 +103,13 @@ public class TestHistoryEventTimelineConversion { tezVertexID = TezVertexID.getInstance(tezDAGID, random.nextInt()); tezTaskID = TezTaskID.getInstance(tezVertexID, random.nextInt()); tezTaskAttemptID = TezTaskAttemptID.getInstance(tezTaskID, random.nextInt()); - dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock").build(); + CallerContextProto.Builder callerContextProto = CallerContextProto.newBuilder(); + callerContextProto.setContext("ctxt"); + callerContextProto.setCallerId("Caller_ID"); + callerContextProto.setCallerType("Caller_Type"); + callerContextProto.setBlob("Desc_1"); + dagPlan = DAGPlan.newBuilder().setName("DAGPlanMock") + .setCallerContext(callerContextProto).build(); containerId = ContainerId.newInstance(applicationAttemptId, 111); nodeId = NodeId.newInstance("node", 13435); } @@ -412,18 +420,25 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(submitTime, timelineEntity.getStartTime().longValue()); - Assert.assertEquals(3, timelineEntity.getPrimaryFilters().size()); + Assert.assertEquals(5, timelineEntity.getPrimaryFilters().size()); Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.DAG_NAME).contains( dagPlan.getName())); Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_ID).contains( + dagPlan.getCallerContext().getCallerId())); + Assert.assertTrue( + timelineEntity.getPrimaryFilters().get(ATSConstants.CALLER_CONTEXT_TYPE).contains( + dagPlan.getCallerContext().getCallerType())); + Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.APPLICATION_ID).contains( applicationAttemptId.getApplicationId().toString())); Assert.assertTrue( timelineEntity.getPrimaryFilters().get(ATSConstants.USER).contains(user)); - Assert.assertEquals(5, timelineEntity.getOtherInfo().size()); + Assert.assertEquals(7, timelineEntity.getOtherInfo().size()); + Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.DAG_PLAN)); Assert.assertEquals(applicationId.toString(), timelineEntity.getOtherInfo().get(ATSConstants.APPLICATION_ID)); @@ -436,6 +451,14 @@ public class TestHistoryEventTimelineConversion { Assert.assertEquals(containerLogs, timelineEntity.getOtherInfo().get(ATSConstants.IN_PROGRESS_LOGS_URL + "_" + applicationAttemptId.getAttemptId())); + Assert.assertEquals( + timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_ID), + dagPlan.getCallerContext().getCallerId()); + Assert.assertEquals( + timelineEntity.getOtherInfo().get(ATSConstants.CALLER_CONTEXT_TYPE), + dagPlan.getCallerContext().getCallerType()); + + } @Test(timeout = 5000) http://git-wip-us.apache.org/repos/asf/tez/blob/aa480654/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java index 3efc5d8..e857a1c 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java @@ -51,6 +51,7 @@ import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.tez.client.CallerContext; import org.apache.tez.client.TezClientUtils; import org.apache.tez.client.TezClient; import org.apache.tez.common.TezUtils; @@ -275,7 +276,7 @@ public class TestOrderedWordCount extends Configured implements Tool { vertices.add(finalReduceVertex); DAG dag = DAG.create("OrderedWordCount" + dagIndex); - dag.setDAGInfo("{ \"context\": \"Tez\", \"description\": \"TestOrderedWordCount Job\" }"); + dag.setCallerContext(CallerContext.create("Tez", "TestOrderedWordCount Job")); for (int i = 0; i < vertices.size(); ++i) { dag.addVertex(vertices.get(i)); }