Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 98D951888E for ; Thu, 6 Aug 2015 09:25:57 +0000 (UTC) Received: (qmail 53838 invoked by uid 500); 6 Aug 2015 09:25:54 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 53677 invoked by uid 500); 6 Aug 2015 09:25:54 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 52857 invoked by uid 99); 6 Aug 2015 09:25:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Aug 2015 09:25:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DEE66E03EC; Thu, 6 Aug 2015 09:25:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@tez.apache.org Date: Thu, 06 Aug 2015 09:26:14 -0000 Message-Id: <1fa66ccd2c874c128002c7dc68b6136e@git.apache.org> In-Reply-To: <10a0859464864279b3a9f48415e4a976@git.apache.org> References: <10a0859464864279b3a9f48415e4a976@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [22/51] [abbrv] tez git commit: TEZ-2388. Send dag identifier as part of the fetcher request string. (sseth) TEZ-2388. Send dag identifier as part of the fetcher request string. (sseth) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9ed22b2c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9ed22b2c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9ed22b2c Branch: refs/heads/TEZ-2003 Commit: 9ed22b2c1e3afaf2e232e5824a29fe662ff71d8e Parents: 5774527 Author: Siddharth Seth Authored: Wed Apr 29 08:20:05 2015 -0700 Committer: Siddharth Seth Committed: Thu Aug 6 01:25:34 2015 -0700 ---------------------------------------------------------------------- TEZ-2003-CHANGES.txt | 1 + .../tez/runtime/library/common/shuffle/Fetcher.java | 14 ++++++++------ .../runtime/library/common/shuffle/ShuffleUtils.java | 8 +++++--- .../library/common/shuffle/impl/ShuffleManager.java | 2 +- .../ShuffleInputEventHandlerOrderedGrouped.java | 2 +- .../runtime/library/common/shuffle/TestFetcher.java | 6 +++--- 6 files changed, 19 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/TEZ-2003-CHANGES.txt ---------------------------------------------------------------------- diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt index d42aaf8..9fc9ed3 100644 --- a/TEZ-2003-CHANGES.txt +++ b/TEZ-2003-CHANGES.txt @@ -19,5 +19,6 @@ ALL CHANGES: TEZ-2347. Expose additional information in TaskCommunicatorContext. TEZ-2361. Propagate dag completion to TaskCommunicator. TEZ-2381. Fixes after rebase 04/28. + TEZ-2388. Send dag identifier as part of the fetcher request string. INCOMPATIBLE CHANGES: http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java index 08b59ed..1092685 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java @@ -89,6 +89,7 @@ public class Fetcher extends CallableWithNdc { private final FetcherCallback fetcherCallback; private final FetchedInputAllocator inputManager; private final ApplicationId appId; + private final int dagIdentifier; private final String logIdentifier; @@ -130,7 +131,7 @@ public class Fetcher extends CallableWithNdc { private final boolean isDebugEnabled = LOG.isDebugEnabled(); private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, - FetchedInputAllocator inputManager, ApplicationId appId, + FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, @@ -144,6 +145,7 @@ public class Fetcher extends CallableWithNdc { this.inputManager = inputManager; this.jobTokenSecretMgr = jobTokenSecretManager; this.appId = appId; + this.dagIdentifier = dagIdentifier; this.pathToAttemptMap = new HashMap(); this.httpConnectionParams = params; this.conf = conf; @@ -417,7 +419,7 @@ public class Fetcher extends CallableWithNdc { private HostFetchResult setupConnection(Collection attempts) { try { StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host, - port, partition, appId.toString(), httpConnectionParams.isSslShuffle()); + port, partition, appId.toString(), dagIdentifier, httpConnectionParams.isSslShuffle()); this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, httpConnectionParams.isKeepAlive()); @@ -930,22 +932,22 @@ public class Fetcher extends CallableWithNdc { public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, - ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, + ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp) { - this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, + this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled, false, localHostname, shufflePort, asyncHttp); } public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, - ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, + ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp) { - this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, + this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier, jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp); } http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java index 1081587..c7cc907 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java @@ -184,19 +184,21 @@ public class ShuffleUtils { // TODO NEWTEZ handle ssl shuffle public static StringBuilder constructBaseURIForShuffleHandler(String host, - int port, int partition, String appId, boolean sslShuffle) { + int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) { return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port), - partition, appId, sslShuffle); + partition, appId, dagIdentifier, sslShuffle); } public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier, - int partition, String appId, boolean sslShuffle) { + int partition, String appId, int dagIdentifier, boolean sslShuffle) { final String http_protocol = (sslShuffle) ? "https://" : "http://"; StringBuilder sb = new StringBuilder(http_protocol); sb.append(hostIdentifier); sb.append("/"); sb.append("mapOutput?job="); sb.append(appId.replace("application", "job")); + sb.append("&dag="); + sb.append(String.valueOf(dagIdentifier)); sb.append("&reduce="); sb.append(String.valueOf(partition)); sb.append("&map="); http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java index 600c332..5bfcab8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java @@ -403,7 +403,7 @@ public class ShuffleManager implements FetcherCallback { } FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this, - httpConnectionParams, inputManager, inputContext.getApplicationId(), + httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(), jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator, lockDisk, localDiskFetchEnabled, sharedFetchEnabled, localhostName, shufflePort, asyncHttp); http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java index 32ac766..9481e65 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java @@ -127,7 +127,7 @@ public class ShuffleInputEventHandlerOrderedGrouped { @VisibleForTesting URI getBaseURI(String host, int port, int partitionId) { StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port, - partitionId, inputContext.getApplicationId().toString(), sslShuffle); + partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(), sslShuffle); URI u = URI.create(sb.toString()); return u; } http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java index 7678b18..85e3540 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java @@ -75,7 +75,7 @@ public class TestFetcher { final boolean DISABLE_LOCAL_FETCH = false; Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, + ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT, false); builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build()); @@ -125,7 +125,7 @@ public class TestFetcher { // When disabled use http fetch conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, + ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, PORT, false); builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts)); fetcher = spy(builder.build()); @@ -159,7 +159,7 @@ public class TestFetcher { int partition = 42; FetcherCallback callback = mock(FetcherCallback.class); Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null, - ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT, false); + ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT, false); builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts)); Fetcher fetcher = spy(builder.build());