tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject tez git commit: TEZ-3115. Shuffle string handling adds significant memory overhead (jeagles)
Date Wed, 02 Mar 2016 20:05:39 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.7 f167c8575 -> 84922f834


TEZ-3115. Shuffle string handling adds significant memory overhead (jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/84922f83
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/84922f83
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/84922f83

Branch: refs/heads/branch-0.7
Commit: 84922f834248e31e681e5ad207b63c578af0be91
Parents: f167c85
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Wed Mar 2 14:05:27 2016 -0600
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Wed Mar 2 14:05:27 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../library/common/shuffle/ShuffleUtils.java    |  11 +-
 .../orderedgrouped/FetcherOrderedGrouped.java   |  20 ++-
 .../common/shuffle/orderedgrouped/MapHost.java  | 135 +++++++++++++++----
 .../common/shuffle/orderedgrouped/Shuffle.java  |   5 +-
 .../ShuffleInputEventHandlerOrderedGrouped.java |  28 ++--
 .../orderedgrouped/ShuffleScheduler.java        |  86 +++++++++---
 .../shuffle/orderedgrouped/TestFetcher.java     |  20 +--
 ...tShuffleInputEventHandlerOrderedGrouped.java |  18 +--
 9 files changed, 219 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c9b13b0..b73fd72 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES:
+  TEZ-3115. Shuffle string handling adds significant memory overhead
   TEZ-3149. Tez-tools: Add username in DagInfo
   TEZ-2988. DAGAppMaster::shutdownTezAM should return with a no-op if it has been invoked
earlier.
   TEZ-3147. Intermediate mem-to-mem: Fix early exit when only one segment can fit into memory

http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 5c9ff77..a4e95f7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -184,18 +184,13 @@ public class ShuffleUtils {
     }
   }
 
-  // TODO NEWTEZ handle ssl shuffle
   public static StringBuilder constructBaseURIForShuffleHandler(String host,
       int port, int partition, String appId, boolean sslShuffle) {
-    return constructBaseURIForShuffleHandler(host + ":" + String.valueOf(port),
-      partition, appId, sslShuffle);
-  }
-  
-  public static StringBuilder constructBaseURIForShuffleHandler(String hostIdentifier,
-      int partition, String appId, boolean sslShuffle) {
     final String http_protocol = (sslShuffle) ? "https://" : "http://";
     StringBuilder sb = new StringBuilder(http_protocol);
-    sb.append(hostIdentifier);
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
     sb.append("/");
     sb.append("mapOutput?job=");
     sb.append(appId.replace("application", "job"));

http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 7b666e9..385ed48 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -38,6 +38,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
@@ -72,7 +73,9 @@ class FetcherOrderedGrouped extends Thread {
   private final Shuffle shuffle;
   private final int id;
   private final String logIdentifier;
-  private final String localShuffleHostPort;
+  private final String localShuffleHost;
+  private final int localShufflePort;
+  private final String applicationId;
   private static int nextId = 0;
   private int currentPartition = -1;
 
@@ -94,6 +97,7 @@ class FetcherOrderedGrouped extends Thread {
 
   HttpConnection httpConnection;
   HttpConnectionParams httpConnectionParams;
+  private final boolean sslShuffle;
 
   // Initiative value is 0, which means it hasn't retried yet.
   private long retryStartTime = 0;
@@ -127,6 +131,7 @@ class FetcherOrderedGrouped extends Thread {
         ShuffleErrors.CONNECTION.toString());
     wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.WRONG_REDUCE.toString());
+    applicationId = inputContext.getApplicationId().toString();
 
     this.ifileReadAhead = ifileReadAhead;
     this.ifileReadAheadLength = ifileReadAheadLength;
@@ -137,9 +142,12 @@ class FetcherOrderedGrouped extends Thread {
       this.codec = null;
     }
     this.conf = conf;
-    this.localShuffleHostPort = localHostname + ":" + String.valueOf(shufflePort);
+    this.localShuffleHost = localHostname;
+    this.localShufflePort = shufflePort;
 
     this.localDiskFetchEnabled = localDiskFetchEnabled;
+    this.sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+        TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
 
     this.logIdentifier = "fetcher {" + TezUtilsInternal
         .cleanVertexName(inputContext.getSourceVertexName()) + "} #" + id;
@@ -161,8 +169,7 @@ class FetcherOrderedGrouped extends Thread {
       assignedHost = scheduler.getHost();
       metrics.threadBusy();
 
-      String hostPort = assignedHost.getHostIdentifier();
-      if (localDiskFetchEnabled && hostPort.equals(localShuffleHostPort)) {
+      if (localDiskFetchEnabled && assignedHost.getHost().equals(localShuffleHost)
&& assignedHost.getPort() == localShufflePort) {
         setupLocalDiskFetch(assignedHost);
       } else {
         // Shuffle
@@ -333,8 +340,9 @@ class FetcherOrderedGrouped extends Thread {
       throws IOException {
     boolean connectSucceeded = false;
     try {
-      URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), attempts,
-          httpConnectionParams.getKeepAlive());
+      StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host.getHost(),
+          host.getPort(), host.getPartitionId(), applicationId, sslShuffle);
+      URL url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, httpConnectionParams.getKeepAlive());
       httpConnection = new HttpConnection(url, httpConnectionParams,
           logIdentifier, jobTokenSecretManager);
       connectSucceeded = httpConnection.connect();

http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
index 3116568..7f8a23c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
@@ -32,44 +32,131 @@ class MapHost {
     PENDING,            // Known map outputs which need to be fetched
     PENALIZED           // Host penalized due to shuffle failures
   }
-  
+
+  public static class HostPort {
+
+    final String host;
+    final int port;
+
+    HostPort(String host, int port) {
+      this.host = host;
+      this.port = port;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((host == null) ? 0 : host.hashCode());
+      result = prime * result + port;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      HostPort other = (HostPort) obj;
+      if (host == null) {
+        if (other.host != null)
+          return false;
+      } else if (!host.equals(other.host))
+        return false;
+      if (port != other.port)
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "HostPort [host=" + host + ", port=" + port + "]";
+    }
+  }
+
+  public static class HostPortPartition {
+
+    final String host;
+    final int port;
+    final int partition;
+
+    HostPortPartition(String host, int port, int partition) {
+      this.host = host;
+      this.port = port;
+      this.partition = partition;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((host == null) ? 0 : host.hashCode());
+      result = prime * result + partition;
+      result = prime * result + port;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      HostPortPartition other = (HostPortPartition) obj;
+      if (partition != other.partition)
+        return false;
+      if (host == null) {
+        if (other.host != null)
+          return false;
+      } else if (!host.equals(other.host))
+        return false;
+      if (port != other.port)
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "HostPortPartition [host=" + host + ", port=" + port + ", partition=" + partition
+ "]";
+    }
+  }
+
   private State state = State.IDLE;
-  private final String hostIdentifier;
-  private final int partitionId;
-  private final String baseUrl;
-  private final String identifier;
+  private final String host;
+  private final int port;
+  private final int partition;
   // Tracks attempt IDs
   private List<InputAttemptIdentifier> maps = new ArrayList<InputAttemptIdentifier>();
   
-  public MapHost(int partitionId, String hostPort, String baseUrl) {
-    this.partitionId = partitionId;
-    this.hostIdentifier = hostPort;
-    this.baseUrl = baseUrl;
-    this.identifier = createIdentifier(hostPort, partitionId);
-  }
-  
-  public static String createIdentifier(String hostName, int partitionId) {
-    return hostName + ":" + Integer.toString(partitionId);
+  public MapHost(String host, int port, int partition) {
+    this.host = host;
+    this.port = port;
+    this.partition = partition;
   }
 
-  public String getIdentifier() {
-    return identifier;
-  }
-  
   public int getPartitionId() {
-    return partitionId;
+    return partition;
   }
 
   public State getState() {
     return state;
   }
 
-  public String getHostIdentifier() {
-    return hostIdentifier;
+  public String getHost() {
+    return host;
+  }
+
+  public int getPort() {
+    return port;
   }
 
-  public String getBaseUrl() {
-    return baseUrl;
+  public String getHostIdentifier() {
+    return host + ":" + port;
   }
 
   public synchronized void addKnownMap(InputAttemptIdentifier srcAttempt) {
@@ -112,7 +199,7 @@ class MapHost {
   
   @Override
   public String toString() {
-    return hostIdentifier;
+    return getHostIdentifier();
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index ca3c05d..efce6d2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -194,8 +194,6 @@ public class Shuffle implements ExceptionReporter {
         + (codec == null ? "None" : codec.getClass().getName())
         + ", ifileReadAhead: " + ifileReadAhead);
 
-    boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
-      TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
     startTime = System.currentTimeMillis();
     scheduler = new ShuffleScheduler(
           this.inputContext,
@@ -231,8 +229,7 @@ public class Shuffle implements ExceptionReporter {
 
     eventHandler= new ShuffleInputEventHandlerOrderedGrouped(
         inputContext,
-        scheduler,
-        sslShuffle);
+        scheduler);
     
     ExecutorService rawExecutor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
         .setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + srcNameTrimmed + "}").build());

http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
index 7c4eb98..0b3f4fc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.java
@@ -19,16 +19,15 @@
 package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.dag.api.TezUncheckedException;
@@ -49,7 +48,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
   private final ShuffleScheduler scheduler;
   private final InputContext inputContext;
 
-  private final boolean sslShuffle;
 
   private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
   private final AtomicInteger numDmeEvents = new AtomicInteger(0);
@@ -57,10 +55,9 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
   private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);
 
   public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext,
-                                                ShuffleScheduler scheduler, boolean sslShuffle)
{
+                                                ShuffleScheduler scheduler) {
     this.inputContext = inputContext;
     this.scheduler = scheduler;
-    this.sslShuffle = sslShuffle;
   }
 
   @Override
@@ -103,17 +100,19 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
       throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
     } 
     int partitionId = dmEvent.getSourceIndex();
+    InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent,
shufflePayload);
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex()
           + ", attemptNum: " + dmEvent.getVersion() + ", payload: " +
           ShuffleUtils.stringify(shufflePayload));
     }
+
     if (shufflePayload.hasEmptyPartitions()) {
       try {
         byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray(shufflePayload.getEmptyPartitions());
         BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray(emptyPartitions);
         if (emptyPartitionsBitSet.get(partitionId)) {
-          InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent,
shufflePayload);
           if (LOG.isDebugEnabled()) {
             LOG.debug(
                 "Source partition: " + partitionId + " did not generate any data. SrcAttempt:
["
@@ -129,11 +128,8 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
       }
     }
 
-    InputAttemptIdentifier srcAttemptIdentifier = constructInputAttemptIdentifier(dmEvent,
shufflePayload);
-
-    URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
-    scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(),
-        partitionId, baseUri.toString(), srcAttemptIdentifier);
+    scheduler.addKnownMapOutput(StringInterner.weakIntern(shufflePayload.getHost()), shufflePayload.getPort(),
+        partitionId, srcAttemptIdentifier);
   }
   
   private void processTaskFailedEvent(InputFailedEvent ifEvent) {
@@ -144,14 +140,6 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
     }
   }
 
-  @VisibleForTesting
-  URI getBaseURI(String host, int port, int partitionId) {
-    StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port,
-      partitionId, inputContext.getApplicationId().toString(), sslShuffle);
-    URI u = URI.create(sb.toString());
-    return u;
-  }
-
   /**
    * Helper method to create InputAttemptIdentifier
    *
@@ -161,7 +149,7 @@ public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandl
    */
   private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dmEvent,
       DataMovementEventPayloadProto shufflePayload) {
-    String pathComponent = (shufflePayload.hasPathComponent()) ? shufflePayload.getPathComponent()
: null;
+    String pathComponent = (shufflePayload.hasPathComponent()) ? StringInterner.weakIntern(shufflePayload.getPathComponent())
: null;
     int spillEventId = shufflePayload.getSpillId();
     InputAttemptIdentifier srcAttemptIdentifier = null;
     if (shufflePayload.hasSpillId()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index b8b6cf2..65b195a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -54,6 +54,8 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPort;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
 
 import com.google.common.collect.Lists;
@@ -65,18 +67,62 @@ class ShuffleScheduler {
     }
   };
 
+  public static class PathPartition {
+
+    final String path;
+    final int partition;
+
+    PathPartition(String path, int partition) {
+      this.path = path;
+      this.partition = partition;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((path == null) ? 0 : path.hashCode());
+      result = prime * result + partition;
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      PathPartition other = (PathPartition) obj;
+      if (path == null) {
+        if (other.path != null)
+          return false;
+      } else if (!path.equals(other.path))
+        return false;
+      if (partition != other.partition)
+        return false;
+      return true;
+    }
+
+    @Override
+    public String toString() {
+      return "PathPartition [path=" + path + ", partition=" + partition + "]";
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(ShuffleScheduler.class);
   static final long INITIAL_PENALTY = 2000L; // 2 seconds
   private static final float PENALTY_GROWTH_RATE = 1.3f;
 
-  private boolean[] finishedMaps;
+  private final BitSet finishedMaps;
   private final int numInputs;
   private final String srcNameTrimmed;
   private int numFetchedSpills;
-  private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+  private Map<HostPortPartition, MapHost> mapLocations = new HashMap<HostPortPartition,
MapHost>();
   @VisibleForTesting
-  final ConcurrentMap<String, InputAttemptIdentifier> pathToIdentifierMap
-      = new ConcurrentHashMap<String, InputAttemptIdentifier>();
+  final ConcurrentMap<PathPartition, InputAttemptIdentifier> pathToIdentifierMap
+      = new ConcurrentHashMap<PathPartition, InputAttemptIdentifier>();
 
   //To track shuffleInfo events when finalMerge is disabled in source or pipelined shuffle
is
   // enabled in source.
@@ -92,9 +138,9 @@ class ShuffleScheduler {
   private final Referee referee;
   @VisibleForTesting
   final Map<InputAttemptIdentifier, IntWritable> failureCounts = new HashMap<InputAttemptIdentifier,IntWritable>();
-  final Set<String> uniqueHosts = Sets.newHashSet();
-  private final Map<String,IntWritable> hostFailures = 
-    new HashMap<String,IntWritable>();
+  final Set<HostPort> uniqueHosts = Sets.newHashSet();
+  private final Map<HostPort,IntWritable> hostFailures = 
+    new HashMap<HostPort,IntWritable>();
   private final InputContext inputContext;
   private final Shuffle shuffle;
   private final TezCounter shuffledInputsCounter;
@@ -157,7 +203,7 @@ class ShuffleScheduler {
       abortFailureLimit = abortFailureLimitConf;
     }
     remainingMaps = new AtomicInteger(numberOfInputs);
-    finishedMaps = new boolean[remainingMaps.get()]; // default init to false
+    finishedMaps = new BitSet(remainingMaps.get()); // default init to false
 
     this.minFailurePerHost = conf.getInt(
         TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST,
@@ -322,7 +368,7 @@ class ShuffleScheduler {
 
         failureCounts.remove(srcAttemptIdentifier);
         if (host != null) {
-          hostFailures.remove(host.getHostIdentifier());
+          hostFailures.remove(new HostPort(host.getHost(), host.getPort()));
         }
 
         output.commit();
@@ -536,7 +582,7 @@ class ShuffleScheduler {
   private void penalizeHost(MapHost host, int failures) {
     host.penalize();
 
-    String hostPort = host.getHostIdentifier();
+    HostPort hostPort = new HostPort(host.getHost(), host.getPort());
     // TODO TEZ-922 hostFailures isn't really used for anything apart from
     // hasFailedAcrossNodes().Factor it into error
     // reporting / potential blacklisting of hosts.
@@ -611,7 +657,7 @@ class ShuffleScheduler {
         (int) Math.ceil(numUniqueHosts * hostFailureFraction));
     int total = 0;
     boolean failedAcrossNodes = false;
-    for(String host : uniqueHosts) {
+    for(HostPort host : uniqueHosts) {
       IntWritable failures = hostFailures.get(host);
       if (failures != null && failures.get() > minFailurePerHost) {
         total++;
@@ -771,17 +817,13 @@ class ShuffleScheduler {
   public synchronized void addKnownMapOutput(String inputHostName,
                                              int port,
                                              int partitionId,
-                                             String hostUrl,
                                              InputAttemptIdentifier srcAttempt) {
-    String hostPort = (inputHostName + ":" + String.valueOf(port));
-    uniqueHosts.add(hostPort);
-    String identifier = MapHost.createIdentifier(hostPort, partitionId);
-
+    uniqueHosts.add(new HostPort(inputHostName, port));
+    HostPortPartition identifier = new HostPortPartition(inputHostName, port, partitionId);
 
     MapHost host = mapLocations.get(identifier);
     if (host == null) {
-      host = new MapHost(partitionId, hostPort, hostUrl);
-      assert identifier.equals(host.getIdentifier());
+      host = new MapHost(inputHostName, port, partitionId);
       mapLocations.put(identifier, host);
     }
 
@@ -1006,8 +1048,8 @@ class ShuffleScheduler {
     
   }
   
-  private String getIdentifierFromPathAndReduceId(String path, int reduceId) {
-    return path + "_" + reduceId;
+  private PathPartition getIdentifierFromPathAndReduceId(String path, int reduceId) {
+    return new PathPartition(path, reduceId);
   }
   
   /**
@@ -1050,13 +1092,13 @@ class ShuffleScheduler {
 
   void setInputFinished(int inputIndex) {
     synchronized(finishedMaps) {
-      finishedMaps[inputIndex] = true;
+      finishedMaps.set(inputIndex);
     }
   }
   
   boolean isInputFinished(int inputIndex) {
     synchronized (finishedMaps) {
-      return finishedMaps[inputIndex];      
+      return finishedMaps.get(inputIndex);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index 8affa66..531c18b 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -45,6 +45,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -86,10 +87,11 @@ public class TestFetcher {
     InputContext inputContext = mock(InputContext.class);
     doReturn(new TezCounters()).when(inputContext).getCounters();
     doReturn("src vertex").when(inputContext).getSourceVertexName();
+    when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1));
 
     final boolean ENABLE_LOCAL_FETCH = true;
     final boolean DISABLE_LOCAL_FETCH = false;
-    MapHost mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+    MapHost mapHost = new MapHost(HOST, PORT, 0);
     FetcherOrderedGrouped
         fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
         false, 0, null, inputContext, conf, ENABLE_LOCAL_FETCH, HOST, PORT);
@@ -106,7 +108,7 @@ public class TestFetcher {
 
     // if hostname does not match use http
     spyFetcher = spy(fetcher);
-    mapHost = new MapHost(0, HOST + "_OTHER" + ":" + PORT, "baseurl");
+    mapHost = new MapHost(HOST + "_OTHER", PORT, 0);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
     doReturn(mapHost).when(scheduler).getHost();
 
@@ -117,7 +119,7 @@ public class TestFetcher {
 
     // if port does not match use http
     spyFetcher = spy(fetcher);
-    mapHost = new MapHost(0, HOST + ":" + (PORT + 1), "baseurl");
+    mapHost = new MapHost(HOST, PORT + 1, 0);
     doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
     doReturn(mapHost).when(scheduler).getHost();
 
@@ -127,7 +129,7 @@ public class TestFetcher {
     verify(spyFetcher, times(1)).copyFromHost(mapHost);
 
     //if local fetch is not enabled
-    mapHost = new MapHost(0, HOST + ":" + PORT, "baseurl");
+    mapHost = new MapHost(HOST, PORT, 0);
     fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
         false, 0, null, inputContext, conf, DISABLE_LOCAL_FETCH, HOST, PORT);
     spyFetcher = spy(fetcher);
@@ -150,14 +152,14 @@ public class TestFetcher {
     InputContext inputContext = mock(InputContext.class);
     when(inputContext.getCounters()).thenReturn(new TezCounters());
     when(inputContext.getSourceVertexName()).thenReturn("");
+    when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1));
 
     FetcherOrderedGrouped
         fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null,
         false, 0, null, inputContext, conf, true, HOST, PORT);
     FetcherOrderedGrouped spyFetcher = spy(fetcher);
 
-    MapHost host = new MapHost(1, HOST + ":" + PORT,
-        "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+    MapHost host = new MapHost(HOST, PORT, 1);
     List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
         new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
         new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
@@ -291,6 +293,7 @@ public class TestFetcher {
     InputContext inputContext = mock(InputContext.class);
     when(inputContext.getCounters()).thenReturn(new TezCounters());
     when(inputContext.getSourceVertexName()).thenReturn("");
+    when(inputContext.getApplicationId()).thenReturn(ApplicationId.newInstance(0, 1));
 
     HttpConnection.HttpConnectionParams httpConnectionParams =
         ShuffleUtils.constructHttpShuffleConnectionParams(conf);
@@ -299,8 +302,7 @@ public class TestFetcher {
             false, 0, null, inputContext, conf, false, HOST, PORT);
     final FetcherOrderedGrouped fetcher = spy(mockFetcher);
 
-    final MapHost host = new MapHost(1, HOST + ":" + PORT,
-        "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+    final MapHost host = new MapHost(HOST, PORT, 1);
     final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
         new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
         new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
@@ -309,7 +311,7 @@ public class TestFetcher {
     doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
     doReturn(true).when(fetcher).setupConnection(host, srcAttempts);
 
-    URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), srcAttempts, false);
+    URL url = ShuffleUtils.constructInputURL("http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=",
srcAttempts, false);
     fetcher.httpConnection = new FakeHttpConnection(url, null, "", null);
 
     doAnswer(new Answer<MapOutput>() {

http://git-wip-us.apache.org/repos/asf/tez/blob/84922f83/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
index 90fbf2f..0d268aa 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.java
@@ -157,7 +157,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         System.currentTimeMillis(),
         "srcNameTrimmed");
     scheduler = spy(realScheduler);
-    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler, false);
+    handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, scheduler);
     mergeManager = mock(MergeManager.class);
   }
 
@@ -171,9 +171,8 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
0);
     handler.handleEvents(Collections.singletonList(dme1));
-    String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
     int partitionId = attemptNum;
-    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri),
eq(id1));
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id1));
     verify(scheduler).pipelinedShuffleInfoEventsMap.containsKey(id1.getInputIdentifier());
 
     //Send final_update event.
@@ -182,10 +181,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
     handler.handleEvents(Collections.singletonList(dme2));
-    baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
     partitionId = attemptNum;
     assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
-    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri),
eq(id2));
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(id2));
     assertTrue(scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
 
     MapHost host = scheduler.getHost();
@@ -226,7 +224,6 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     //Process attempt #1 first
     int attemptNum = 1;
     int inputIdx = 1;
-    String baseUri = handler.getBaseURI(HOST, PORT, attemptNum).toString();
 
     Event dme1 = createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0,
attemptNum);
     handler.handleEvents(Collections.singletonList(dme1));
@@ -235,7 +232,7 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         new InputAttemptIdentifier(inputIdx, attemptNum,
             PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE,
0);
 
-    verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(baseUri),
eq(id1));
+    verify(scheduler, times(1)).addKnownMapOutput(eq(HOST), eq(PORT), eq(1), eq(id1));
     assertTrue("Shuffle info events should not be empty for pipelined shuffle",
         !scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
 
@@ -261,10 +258,9 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
     handler.handleEvents(events);
     InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0,
         PATH_COMPONENT);
-    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
     int partitionId = srcIdx;
     verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId),
-        eq(baseUri), eq(expectedIdentifier));
+        eq(expectedIdentifier));
     assertTrue("Shuffle info events should be empty for regular shuffle codepath",
         scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
   }
@@ -317,12 +313,10 @@ public class TestShuffleInputEventHandlerOrderedGrouped {
         false);
     events.add(dme);
     handler.handleEvents(events);
-    String baseUri = handler.getBaseURI(HOST, PORT, srcIdx).toString();
     int partitionId = srcIdx;
     InputAttemptIdentifier expectedIdentifier =
         new InputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT);
-    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(baseUri),
-        eq(expectedIdentifier));
+    verify(scheduler).addKnownMapOutput(eq(HOST), eq(PORT), eq(partitionId), eq(expectedIdentifier));
   }
 
   private ByteString createEmptyPartitionByteString(int... emptyPartitions) throws IOException
{


Mime
View raw message