tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [22/51] [abbrv] tez git commit: TEZ-2388. Send dag identifier as part of the fetcher request string. (sseth)
Date Thu, 06 Aug 2015 09:26:14 GMT
TEZ-2388. Send dag identifier as part of the fetcher request string. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9ed22b2c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9ed22b2c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9ed22b2c

Branch: refs/heads/TEZ-2003
Commit: 9ed22b2c1e3afaf2e232e5824a29fe662ff71d8e
Parents: 5774527
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Apr 29 08:20:05 2015 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Aug 6 01:25:34 2015 -0700

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                                  |  1 +
 .../tez/runtime/library/common/shuffle/Fetcher.java   | 14 ++++++++------
 .../runtime/library/common/shuffle/ShuffleUtils.java  |  8 +++++---
 .../library/common/shuffle/impl/ShuffleManager.java   |  2 +-
 .../ShuffleInputEventHandlerOrderedGrouped.java       |  2 +-
 .../runtime/library/common/shuffle/TestFetcher.java   |  6 +++---
 6 files changed, 19 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d42aaf8..9fc9ed3 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -19,5 +19,6 @@ ALL CHANGES:
   TEZ-2347. Expose additional information in TaskCommunicatorContext.
   TEZ-2361. Propagate dag completion to TaskCommunicator.
   TEZ-2381. Fixes after rebase 04/28.
+  TEZ-2388. Send dag identifier as part of the fetcher request string.
 
 INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 08b59ed..1092685 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -89,6 +89,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   private final FetcherCallback fetcherCallback;
   private final FetchedInputAllocator inputManager;
   private final ApplicationId appId;
+  private final int dagIdentifier;
   
   private final String logIdentifier;
 
@@ -130,7 +131,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   private final boolean isDebugEnabled = LOG.isDebugEnabled();
 
   private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
-      FetchedInputAllocator inputManager, ApplicationId appId,
+      FetchedInputAllocator inputManager, ApplicationId appId, int dagIdentifier,
       JobTokenSecretManager jobTokenSecretManager, String srcNameTrimmed, Configuration conf,
       RawLocalFileSystem localFs,
       LocalDirAllocator localDirAllocator,
@@ -144,6 +145,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
     this.inputManager = inputManager;
     this.jobTokenSecretMgr = jobTokenSecretManager;
     this.appId = appId;
+    this.dagIdentifier = dagIdentifier;
     this.pathToAttemptMap = new HashMap<String, InputAttemptIdentifier>();
     this.httpConnectionParams = params;
     this.conf = conf;
@@ -417,7 +419,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> attempts)
{
     try {
       StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
-          port, partition, appId.toString(), httpConnectionParams.isSslShuffle());
+          port, partition, appId.toString(), dagIdentifier, httpConnectionParams.isSslShuffle());
       this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
           httpConnectionParams.isKeepAlive());
 
@@ -930,22 +932,22 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
         HttpConnectionParams params, FetchedInputAllocator inputManager,
-        ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
+        ApplicationId appId, int dagIdentifier,  JobTokenSecretManager jobTokenSecretMgr,
String srcNameTrimmed,
         Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
         boolean asyncHttp) {
-      this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+      this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
           jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
           false, localHostname, shufflePort, asyncHttp);
     }
 
     public FetcherBuilder(FetcherCallback fetcherCallback,
         HttpConnectionParams params, FetchedInputAllocator inputManager,
-        ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
+        ApplicationId appId, int dagIdentifier, JobTokenSecretManager jobTokenSecretMgr,
String srcNameTrimmed,
         Configuration conf, RawLocalFileSystem localFs,
         LocalDirAllocator localDirAllocator, Path lockPath,
         boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
         String localHostname, int shufflePort, boolean asyncHttp) {
-      this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
+      this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId, dagIdentifier,
           jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
           lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort,
asyncHttp);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 1081587..c7cc907 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -184,19 +184,21 @@ public class ShuffleUtils {
 
   // TODO NEWTEZ handle ssl shuffle
   public static StringBuilder constructBaseURIForShuffleHandler(String host,
-      int port, int partition, String appId, boolean sslShuffle) {
+      int port, int partition, String appId, int dagIdentifier, boolean sslShuffle) {
     return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
-      partition, appId, sslShuffle);
+      partition, appId, dagIdentifier, sslShuffle);
   }
   
   public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
-      int partition, String appId, boolean sslShuffle) {
+      int partition, String appId, int dagIdentifier, boolean sslShuffle) {
     final String http_protocol = (sslShuffle) ? "https://" : "http://";
     StringBuilder sb = new StringBuilder(http_protocol);
     sb.append(hostIdentifier);
     sb.append("/");
     sb.append("mapOutput?job=");
     sb.append(appId.replace("application", "job"));
+    sb.append("&dag=");
+    sb.append(String.valueOf(dagIdentifier));
     sb.append("&reduce=");
     sb.append(String.valueOf(partition));
     sb.append("&map=");

http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 600c332..5bfcab8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -403,7 +403,7 @@ public class ShuffleManager implements FetcherCallback {
     }
 
     FetcherBuilder fetcherBuilder = new FetcherBuilder(ShuffleManager.this,
-      httpConnectionParams, inputManager, inputContext.getApplicationId(),
+      httpConnectionParams, inputManager, inputContext.getApplicationId(), inputContext.getDagIdentifier(),
         jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
         lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
         localhostName, shufflePort, asyncHttp);

http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 32ac766..9481e65 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -127,7 +127,7 @@ public class ShuffleInputEventHandlerOrderedGrouped {
   @VisibleForTesting
   URI getBaseURI(String host, int port, int partitionId) {
     StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
-      partitionId, inputContext.getApplicationId().toString(), sslShuffle);
+      partitionId, inputContext.getApplicationId().toString(), inputContext.getDagIdentifier(),
sslShuffle);
     URI u = URI.create(sb.toString());
     return u;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/9ed22b2c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 7678b18..85e3540 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -75,7 +75,7 @@ public class TestFetcher {
     final boolean DISABLE_LOCAL_FETCH = false;
 
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
+        ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, ENABLE_LOCAL_FETCH,
HOST,
         PORT, false);
     builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
@@ -125,7 +125,7 @@ public class TestFetcher {
     // When disabled use http fetch
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
     builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH,
HOST,
+        ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, DISABLE_LOCAL_FETCH,
HOST,
         PORT, false);
     builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
     fetcher = spy(builder.build());
@@ -159,7 +159,7 @@ public class TestFetcher {
     int partition = 42;
     FetcherCallback callback = mock(FetcherCallback.class);
     Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
-        ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT, false);
+        ApplicationId.newInstance(0, 1), 1, null, "fetcherTest", conf, true, HOST, PORT,
false);
     builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
     Fetcher fetcher = spy(builder.build());
 


Mime
View raw message