tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput. Contributed by Ming Ma.
Date Thu, 28 Apr 2016 03:35:36 GMT
Repository: tez
Updated Branches:
  refs/heads/master 4389ce8cf -> f8e014876


TEZ-3207. Add support for fetching multiple partitions from the same
source task to UnorderedKVInput. Contributed by Ming Ma.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f8e01487
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f8e01487
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f8e01487

Branch: refs/heads/master
Commit: f8e014876dc57fbc27ed0f8280e5d5ba01a7c1e4
Parents: 4389ce8
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Apr 28 09:05:03 2016 +0530
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Apr 28 09:05:03 2016 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../runtime/library/common/shuffle/Fetcher.java |  13 +-
 .../library/common/shuffle/HostPort.java        |  77 ++++++
 .../library/common/shuffle/InputHost.java       | 144 +++++-----
 .../common/shuffle/impl/ShuffleManager.java     |  76 +++---
 .../common/shuffle/orderedgrouped/MapHost.java  |  44 ----
 .../orderedgrouped/ShuffleScheduler.java        |   2 +-
 .../impl/TestShuffleInputEventHandlerImpl.java  |   3 +-
 .../common/shuffle/impl/TestShuffleManager.java | 261 +++++++++++++++++++
 9 files changed, 464 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1fa718..324ca38 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.9.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3207. Add support for fetching multiple partitions from the same source task to UnorderedKVInput.
   TEZ-3232. Disable randomFailingInputs in testFaulttolerance to unblock other tests.
   TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts.
   TEZ-3224. User payload is not initialized before creating vertex manager plugin.

http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 261f2e7..d445587 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -102,8 +102,19 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   // Parameters to track work.
   private List<InputAttemptIdentifier> srcAttempts;
   @VisibleForTesting
+  public List<InputAttemptIdentifier> getSrcAttempts() {
+    return srcAttempts;
+  }
+
+  @VisibleForTesting
   Map<String, InputAttemptIdentifier> srcAttemptsRemaining;
+
   private String host;
+  @VisibleForTesting
+  public String getHost() {
+    return host;
+  }
+
   private int port;
   private int partition;
 
@@ -182,7 +193,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
   }
 
   @Override
-  protected FetchResult callInternal() throws Exception {
+  public FetchResult callInternal() throws Exception {
     boolean multiplex = (this.sharedFetchEnabled && this.localDiskFetchEnabled);
 
     if (srcAttempts.size() == 0) {

http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HostPort.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HostPort.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HostPort.java
new file mode 100644
index 0000000..cac9d9a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HostPort.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.shuffle;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+public class HostPort {
+
+  private final String host;
+  private final int port;
+
+  public HostPort(String host, int port) {
+    this.host = host;
+    this.port = port;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((host == null) ? 0 : host.hashCode());
+    result = prime * result + port;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    HostPort other = (HostPort) obj;
+    if (host == null) {
+      if (other.host != null)
+        return false;
+    } else if (!host.equals(other.host))
+      return false;
+    if (port != other.port)
+      return false;
+    return true;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public String getIdentifier() {
+    return host + ":" + port;
+  }
+
+  @Override
+  public String toString() {
+    return host + ":" + port;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
index b3382ea..a447d83 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/InputHost.java
@@ -19,50 +19,30 @@
 package org.apache.tez.runtime.library.common.shuffle;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
 /**
  * Represents a Host with respect to the MapReduce ShuffleHandler.
  * 
- * srcPhysicalIndex / partition is part of this since that only knows how to
- * serve ine partition at a time.
  */
-public class InputHost {
+public class InputHost extends HostPort {
 
-  private final String host;
-  private final int port;
-  private final int srcPhysicalIndex;
-  private final String identifier;
   private String additionalInfo;
 
-  private final BlockingQueue<InputAttemptIdentifier> inputs = new LinkedBlockingQueue<InputAttemptIdentifier>();
+  // Each input host can support more than one partition.
+  // Each partition has a list of inputs for pipelined shuffle.
+  private final Map<Integer, BlockingQueue<InputAttemptIdentifier>>
+      partitionToInputs = new ConcurrentHashMap<>();
 
-  public static String createIdentifier(String host, int port) {
-    return (host + ":" + String.valueOf(port));
-  }
-  
-  public InputHost(String hostName, int port, ApplicationId appId, int srcPhysicalIndex)
{
-    this.host = hostName;
-    this.port = port;
-    this.srcPhysicalIndex = srcPhysicalIndex;
-    this.identifier = createIdentifier(hostName, port);
-  }
-
-  public String getHost() {
-    return this.host;
-  }
-
-  public int getPort() {
-    return this.port;
-  }
-  
-  public String getIdentifier() {
-    return this.identifier;
+  public InputHost(HostPort hostPort) {
+    super(hostPort.getHost(), hostPort.getPort());
   }
 
   public void setAdditionalInfo(String additionalInfo) {
@@ -73,70 +53,76 @@ public class InputHost {
     return (additionalInfo == null) ? "" : additionalInfo;
   }
 
-  public int getSrcPhysicalIndex() {
-    return this.srcPhysicalIndex;
+  public int getNumPendingPartitions() {
+    return partitionToInputs.size();
   }
 
-  public int getNumPendingInputs() {
-    return inputs.size();
-  }
-  
-  public void addKnownInput(InputAttemptIdentifier srcAttempt) {
+  public void addKnownInput(Integer partition,
+      InputAttemptIdentifier srcAttempt) {
+    BlockingQueue<InputAttemptIdentifier> inputs =
+        partitionToInputs.get(partition);
+    if (inputs == null) {
+      inputs = new LinkedBlockingQueue<InputAttemptIdentifier>();
+      partitionToInputs.put(partition, inputs);
+    }
     inputs.add(srcAttempt);
   }
 
-  public List<InputAttemptIdentifier> clearAndGetPendingInputs() {
-    List<InputAttemptIdentifier> inputsCopy = new ArrayList<InputAttemptIdentifier>(
-        inputs.size());
-    inputs.drainTo(inputsCopy);
-    return inputsCopy;
+  public PartitionToInputs clearAndGetOnePartition() {
+    for (Map.Entry<Integer, BlockingQueue<InputAttemptIdentifier>> entry :
+        partitionToInputs.entrySet()) {
+      List<InputAttemptIdentifier> inputs =
+          new ArrayList<InputAttemptIdentifier>(entry.getValue().size());
+      entry.getValue().drainTo(inputs);
+      PartitionToInputs ret = new PartitionToInputs(entry.getKey(), inputs);
+      partitionToInputs.remove(entry.getKey());
+      return ret;
+    }
+    return null;
+  }
+
+  public String toDetailedString() {
+    return "HostPort=" + super.toString() + ", InputDetails=" +
+        partitionToInputs;
+  }
+  
+  @Override
+  public String toString() {
+    return "HostPort=" + super.toString() + ", PartitionIds=" +
+        partitionToInputs.keySet();
   }
 
   @Override
   public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((host == null) ? 0 : host.hashCode());
-    result = prime * result + port;
-    result = prime * result + srcPhysicalIndex;
-    return result;
+    return super.hashCode();
   }
 
   @Override
-  public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null) {
-      return false;
-    }
-    if (getClass() != obj.getClass()) {
-      return false;
+  public boolean equals(Object to) {
+    return super.equals(to);
+  }
+
+  public static class PartitionToInputs {
+    private int partition;
+    private List<InputAttemptIdentifier> inputs;
+
+    public PartitionToInputs(int partition,
+        List<InputAttemptIdentifier> input) {
+      this.partition = partition;
+      this.inputs = input;
     }
-    InputHost other = (InputHost) obj;
-    if (host == null) {
-      if (other.host != null) {
-        return false;
-      }
-    } else if (!host.equals(other.host))
-      return false;
-    if (port != other.port) {
-      return false;
+
+    public int getPartition() {
+      return partition;
     }
-    if (srcPhysicalIndex != other.srcPhysicalIndex) {
-      return false;
+
+    public List<InputAttemptIdentifier> getInputs() {
+      return inputs;
     }
-    return true;
-  }
 
-  public String toDetailedString() {
-    return "InputHost [host=" + host + ", port=" + port + ",srcPhysicalIndex=" + srcPhysicalIndex
-        + ", inputs=" + inputs + "]";
-  }
-  
-  @Override
-  public String toString() {
-    return "InputHost [host=" + host + ", port=" + port + ", srcPhysicalIndex=" + srcPhysicalIndex
-        + "]";
+    @Override
+    public String toString() {
+      return "partition=" + partition + ", inputs=" + inputs;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index b82098e..7ca9a1f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -71,13 +71,15 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.FetchResult;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
 import org.apache.tez.runtime.library.common.shuffle.Fetcher;
+import org.apache.tez.runtime.library.common.shuffle.Fetcher.FetcherBuilder;
 import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
+import org.apache.tez.runtime.library.common.shuffle.HostPort;
 import org.apache.tez.runtime.library.common.shuffle.InputHost;
+import org.apache.tez.runtime.library.common.shuffle.InputHost.PartitionToInputs;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
-import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
-import org.apache.tez.runtime.library.common.shuffle.Fetcher.FetcherBuilder;
 
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -104,15 +106,17 @@ public class ShuffleManager implements FetcherCallback {
 
   private final FetchedInputAllocator inputManager;
 
-  private final ListeningExecutorService fetcherExecutor;
+  @VisibleForTesting
+  final ListeningExecutorService fetcherExecutor;
 
   private final ListeningExecutorService schedulerExecutor;
   private final RunShuffleCallable schedulerCallable;
   
   private final BlockingQueue<FetchedInput> completedInputs;
   private final AtomicBoolean inputReadyNotificationSent = new AtomicBoolean(false);
-  private final Set<Integer> completedInputSet;
-  private final ConcurrentMap<String, InputHost> knownSrcHosts;
+  @VisibleForTesting
+  final Set<Integer> completedInputSet;
+  private final ConcurrentMap<HostPort, InputHost> knownSrcHosts;
   private final BlockingQueue<InputHost> pendingHosts;
   private final Set<InputAttemptIdentifier> obsoletedInputs;
   private Set<Fetcher> runningFetchers;
@@ -211,7 +215,7 @@ public class ShuffleManager implements FetcherCallback {
      * We do not know upfront the number of spills from source.
      */
     completedInputs = new LinkedBlockingDeque<FetchedInput>();
-    knownSrcHosts = new ConcurrentHashMap<String, InputHost>();
+    knownSrcHosts = new ConcurrentHashMap<HostPort, InputHost>();
     pendingHosts = new LinkedBlockingQueue<InputHost>();
     obsoletedInputs = Collections.newSetFromMap(new ConcurrentHashMap<InputAttemptIdentifier,
Boolean>());
     runningFetchers = Collections.newSetFromMap(new ConcurrentHashMap<Fetcher, Boolean>());
@@ -336,13 +340,15 @@ public class ShuffleManager implements FetcherCallback {
                 }
               }
               if (LOG.isDebugEnabled()) {
-                LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " + inputHost.toDetailedString());
+                LOG.debug(srcNameTrimmed + ": " + "Processing pending host: " +
+                    inputHost.toDetailedString());
               }
-              if (inputHost.getNumPendingInputs() > 0 && !isShutdown.get()) {
+              if (inputHost.getNumPendingPartitions() > 0 && !isShutdown.get())
{
                 Fetcher fetcher = constructFetcherForHost(inputHost, conf);
                 runningFetchers.add(fetcher);
                 if (isShutdown.get()) {
-                  LOG.info(srcNameTrimmed + ": " + "hasBeenShutdown, Breaking out of ShuffleScheduler
Loop");
+                  LOG.info(srcNameTrimmed + ": " + "hasBeenShutdown," +
+                      "Breaking out of ShuffleScheduler Loop");
                   break;
                 }
                 ListenableFuture<FetchResult> future = fetcherExecutor
@@ -353,8 +359,9 @@ public class ShuffleManager implements FetcherCallback {
                 }
               } else {
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug(srcNameTrimmed + ": " + "Skipping host: " + inputHost.getIdentifier()
-                      + " since it has no inputs to process");
+                  LOG.debug(srcNameTrimmed + ": " + "Skipping host: " +
+                      inputHost.getIdentifier() +
+                      " since it has no inputs to process");
                 }
               }
             }
@@ -389,8 +396,9 @@ public class ShuffleManager implements FetcherCallback {
     }
     return true;
   }
-  
-  private Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
+
+  @VisibleForTesting
+  Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
 
     Path lockDisk = null;
 
@@ -413,11 +421,12 @@ public class ShuffleManager implements FetcherCallback {
 
     // Remove obsolete inputs from the list being given to the fetcher. Also
     // remove from the obsolete list.
-    List<InputAttemptIdentifier> pendingInputsForHost = inputHost
-        .clearAndGetPendingInputs();
+    PartitionToInputs pendingInputsOfOnePartition = inputHost
+        .clearAndGetOnePartition();
     int includedMaps = 0;
-    for (Iterator<InputAttemptIdentifier> inputIter = pendingInputsForHost
-        .iterator(); inputIter.hasNext();) {
+    for (Iterator<InputAttemptIdentifier> inputIter =
+        pendingInputsOfOnePartition.getInputs().iterator();
+            inputIter.hasNext();) {
       InputAttemptIdentifier input = inputIter.next();
 
       //For pipelined shuffle.
@@ -439,20 +448,23 @@ public class ShuffleManager implements FetcherCallback {
       // Check if max threshold is met
       if (includedMaps >= maxTaskOutputAtOnce) {
         inputIter.remove();
-        inputHost.addKnownInput(input); //add to inputHost
+        //add to inputHost
+        inputHost.addKnownInput(pendingInputsOfOnePartition.getPartition(),
+            input);
       } else {
         includedMaps++;
       }
     }
-    if (inputHost.getNumPendingInputs() > 0) {
+    if (inputHost.getNumPendingPartitions() > 0) {
       pendingHosts.add(inputHost); //add it to queue
     }
     fetcherBuilder.assignWork(inputHost.getHost(), inputHost.getPort(),
-        inputHost.getSrcPhysicalIndex(), pendingInputsForHost);
+        pendingInputsOfOnePartition.getPartition(),
+            pendingInputsOfOnePartition.getInputs());
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created Fetcher for host: " + inputHost.getHost()
           + ", info: " + inputHost.getAdditionalInfo()
-          + ", with inputs: " + pendingInputsForHost);
+          + ", with inputs: " + pendingInputsOfOnePartition);
     }
     return fetcherBuilder.build();
   }
@@ -461,18 +473,18 @@ public class ShuffleManager implements FetcherCallback {
   
   public void addKnownInput(String hostName, int port,
       InputAttemptIdentifier srcAttemptIdentifier, int srcPhysicalIndex) {
-    String identifier = InputHost.createIdentifier(hostName, port);
+    HostPort identifier = new HostPort(hostName, port);
     InputHost host = knownSrcHosts.get(identifier);
     if (host == null) {
-      host = new InputHost(hostName, port, inputContext.getApplicationId(), srcPhysicalIndex);
-      assert identifier.equals(host.getIdentifier());
+      host = new InputHost(identifier);
       InputHost old = knownSrcHosts.putIfAbsent(identifier, host);
       if (old != null) {
         host = old;
       }
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug(srcNameTrimmed + ": " + "Adding input: " + srcAttemptIdentifier + ", to host:
" + host);
+      LOG.debug(srcNameTrimmed + ": " + "Adding input: " +
+          srcAttemptIdentifier + ", to host: " + host);
     }
 
     if (!validateInputAttemptForPipelinedShuffle(srcAttemptIdentifier)) {
@@ -484,12 +496,13 @@ public class ShuffleManager implements FetcherCallback {
       shuffleInfoEventsMap.put(inputIdentifier, new ShuffleEventInfo(srcAttemptIdentifier));
     }
 
-    host.addKnownInput(srcAttemptIdentifier);
+    host.addKnownInput(srcPhysicalIndex, srcAttemptIdentifier);
     lock.lock();
     try {
       boolean added = pendingHosts.offer(host);
       if (!added) {
-        String errorMessage = "Unable to add host: " + host.getIdentifier() + " to pending
queue";
+        String errorMessage = "Unable to add host: " +
+            host.getIdentifier() + " to pending queue";
         LOG.error(errorMessage);
         throw new TezUncheckedException(errorMessage);
       }
@@ -865,7 +878,8 @@ public class ShuffleManager implements FetcherCallback {
    * Fake input that is added to the completed input list in case an input does not have
any data.
    *
    */
-  private static class NullFetchedInput extends FetchedInput {
+  @VisibleForTesting
+  static class NullFetchedInput extends FetchedInput {
 
     public NullFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
       super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
@@ -966,10 +980,12 @@ public class ShuffleManager implements FetcherCallback {
       } else {
         Iterable<InputAttemptIdentifier> pendingInputs = result.getPendingInputs();
         if (pendingInputs != null && pendingInputs.iterator().hasNext()) {
-          InputHost inputHost = knownSrcHosts.get(InputHost.createIdentifier(result.getHost(),
result.getPort()));
+          HostPort identifier = new HostPort(result.getHost(),
+              result.getPort());
+          InputHost inputHost = knownSrcHosts.get(identifier);
           assert inputHost != null;
           for (InputAttemptIdentifier input : pendingInputs) {
-            inputHost.addKnownInput(input);
+            inputHost.addKnownInput(result.getPartition(), input);
           }
           inputHost.setAdditionalInfo(result.getAdditionalInfo());
           pendingHosts.add(inputHost);

http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
index 486d8c5..c2cfd06 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MapHost.java
@@ -33,50 +33,6 @@ class MapHost {
     PENALIZED           // Host penalized due to shuffle failures
   }
 
-  public static class HostPort {
-
-    final String host;
-    final int port;
-
-    HostPort(String host, int port) {
-      this.host = host;
-      this.port = port;
-    }
-
-    @Override
-    public int hashCode() {
-      final int prime = 31;
-      int result = 1;
-      result = prime * result + ((host == null) ? 0 : host.hashCode());
-      result = prime * result + port;
-      return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj)
-        return true;
-      if (obj == null)
-        return false;
-      if (getClass() != obj.getClass())
-        return false;
-      HostPort other = (HostPort) obj;
-      if (host == null) {
-        if (other.host != null)
-          return false;
-      } else if (!host.equals(other.host))
-        return false;
-      if (port != other.port)
-        return false;
-      return true;
-    }
-
-    @Override
-    public String toString() {
-      return "HostPort [host=" + host + ", port=" + port + "]";
-    }
-  }
-
   public static class HostPortPartition {
 
     final String host;

http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 2f6e490..0a2b730 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -73,7 +73,7 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
-import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPort;
+import org.apache.tez.runtime.library.common.shuffle.HostPort;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost.HostPortPartition;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Type;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
index 0294bd3..6bcbeb6 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.java
@@ -18,7 +18,6 @@
 
 package org.apache.tez.runtime.library.common.shuffle.impl;
 
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -334,7 +333,7 @@ public class TestShuffleInputEventHandlerImpl {
     DataMovementEventPayloadProto.Builder builder = DataMovementEventPayloadProto.newBuilder();
     builder.setHost(HOST);
     builder.setPort(PORT);
-    builder.setPathComponent("attempttmp");
+    builder.setPathComponent(PATH_COMPONENT);
     if (emptyPartitionByteString != null) {
       builder.setEmptyPartitions(emptyPartitionByteString);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/f8e01487/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
new file mode 100644
index 0000000..a5608ef
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleManager.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.common.shuffle.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.TezRuntimeFrameworkConfigs;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.ExecutionContext;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
+import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
+import org.apache.tez.runtime.library.common.shuffle.Fetcher;
+import org.apache.tez.runtime.library.common.shuffle.FetchResult;
+import org.apache.tez.runtime.library.common.shuffle.InputHost;
+import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class TestShuffleManager {
+
+  private static final String FETCHER_HOST = "localhost";
+  private static final int PORT = 8080;
+  private static final String PATH_COMPONENT = "attempttmp";
+  private final Configuration conf = new Configuration();
+
+  /**
+   * One reducer fetches multiple partitions from each mapper.
+   * For a given mapper, the reducer sends DataMovementEvents for several
+   * partitions, wait for some time and then send DataMovementEvents for the
+   * rest of the partitions. Then do the same thing for the next mapper.
+   * Verify ShuffleManager is able to get all the events.
+  */
+  @Test(timeout = 50000)
+  public void testMultiplePartitions() throws Exception {
+    final int numOfMappers = 3;
+    final int numOfPartitions = 5;
+    final int firstPart = 2;
+    InputContext inputContext = createInputContext();
+    ShuffleManagerForTest shuffleManager = createShuffleManager(inputContext,
+        numOfMappers * numOfPartitions);
+    FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
+
+    ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(
+        inputContext, shuffleManager, inputAllocator, null, false, 0);
+    shuffleManager.run();
+
+    List<Event> eventList = new LinkedList<Event>();
+
+    int targetIndex = 0; // The physical input index within the reduce task
+
+    for (int i = 0; i < numOfMappers; i++) {
+      String mapperHost = "host" + i;
+      int srcIndex = 20; // The physical output index within the map task
+      // Send the first batch of DataMovementEvents
+      eventList.clear();
+      for (int j = 0; j < firstPart; j++) {
+        Event dme = createDataMovementEvent(mapperHost, srcIndex++,
+            targetIndex++);
+        eventList.add(dme);
+      }
+      handler.handleEvents(eventList);
+
+      Thread.sleep(500);
+
+
+      // Send the second batch of DataMovementEvents
+      eventList.clear();
+      for (int j = 0; j < numOfPartitions - firstPart; j++) {
+        Event dme = createDataMovementEvent(mapperHost, srcIndex++,
+            targetIndex++);
+        eventList.add(dme);
+      }
+      handler.handleEvents(eventList);
+    }
+
+    int waitCount = 100;
+    while (waitCount-- > 0 &&
+        !(shuffleManager.isFetcherExecutorShutdown() &&
+            numOfMappers * numOfPartitions ==
+                shuffleManager.getNumOfCompletedInputs())) {
+      Thread.sleep(100);
+    }
+    assertTrue(shuffleManager.isFetcherExecutorShutdown());
+    assertEquals(numOfMappers * numOfPartitions,
+        shuffleManager.getNumOfCompletedInputs());
+  }
+
+  private InputContext createInputContext() throws IOException {
+    DataOutputBuffer port_dob = new DataOutputBuffer();
+    port_dob.writeInt(PORT);
+    final ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0,
+        port_dob.getLength());
+
+    ExecutionContext executionContext = mock(ExecutionContext.class);
+    doReturn(FETCHER_HOST).when(executionContext).getHostName();
+
+    InputContext inputContext = mock(InputContext.class);
+    doReturn(new TezCounters()).when(inputContext).getCounters();
+    doReturn("sourceVertex").when(inputContext).getSourceVertexName();
+    doReturn(shuffleMetaData).when(inputContext)
+        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    doReturn(executionContext).when(inputContext).getExecutionContext();
+    return inputContext;
+  }
+
+  @SuppressWarnings("unchecked")
+  private ShuffleManagerForTest createShuffleManager(
+      InputContext inputContext, int expectedNumOfPhysicalInputs)
+          throws IOException {
+    Path outDirBase = new Path(".", "outDir");
+    String[] outDirs = new String[] { outDirBase.toString() };
+    doReturn(outDirs).when(inputContext).getWorkDirs();
+    conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS,
+        inputContext.getWorkDirs());
+
+    DataOutputBuffer out = new DataOutputBuffer();
+    Token<JobTokenIdentifier> token = new Token(new JobTokenIdentifier(),
+        new JobTokenSecretManager(null));
+    token.write(out);
+    doReturn(ByteBuffer.wrap(out.getData())).when(inputContext).
+        getServiceConsumerMetaData(
+            TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID);
+
+    FetchedInputAllocator inputAllocator = mock(FetchedInputAllocator.class);
+    return new ShuffleManagerForTest(inputContext, conf,
+        expectedNumOfPhysicalInputs, 1024, false, -1, null, inputAllocator);
+  }
+
+  private Event createDataMovementEvent(String host, int srcIndex, int targetIndex) {
+    DataMovementEventPayloadProto.Builder builder =
+        DataMovementEventPayloadProto.newBuilder();
+    builder.setHost(host);
+    builder.setPort(PORT);
+    builder.setPathComponent(PATH_COMPONENT);
+    Event dme = DataMovementEvent
+        .create(srcIndex, targetIndex, 0,
+            builder.build().toByteString().asReadOnlyByteBuffer());
+    return dme;
+  }
+
+  private static class ShuffleManagerForTest extends ShuffleManager {
+    public ShuffleManagerForTest(InputContext inputContext, Configuration conf,
+        int numInputs, int bufferSize, boolean ifileReadAheadEnabled,
+        int ifileReadAheadLength, CompressionCodec codec,
+        FetchedInputAllocator inputAllocator) throws IOException {
+      super(inputContext, conf, numInputs, bufferSize, ifileReadAheadEnabled,
+          ifileReadAheadLength, codec, inputAllocator);
+    }
+
+    @Override
+    Fetcher constructFetcherForHost(InputHost inputHost, Configuration conf) {
+      final Fetcher fetcher = spy(super.constructFetcherForHost(inputHost,
+          conf));
+      final FetchResult mockFetcherResult = mock(FetchResult.class);
+      try {
+        doAnswer(new Answer() {
+          @Override
+          public Object answer(InvocationOnMock invocation) throws Throwable {
+            for(InputAttemptIdentifier input : fetcher.getSrcAttempts()) {
+              ShuffleManagerForTest.this.fetchSucceeded(
+                  fetcher.getHost(), input, new TestFetchedInput(input), 0, 0,
+                      0);
+            }
+            return mockFetcherResult;
+          }
+        }).when(fetcher).callInternal();
+      } catch (Exception e) {
+        //ignore
+      }
+      return fetcher;
+    }
+
+    public int getNumOfCompletedInputs() {
+      return completedInputSet.size();
+    }
+
+    boolean isFetcherExecutorShutdown() {
+      return fetcherExecutor.isShutdown();
+    }
+  }
+
+  /**
+   * Fake input that is added to the completed input list in case an input does not have
any data.
+   *
+   */
+  @VisibleForTesting
+  static class TestFetchedInput extends FetchedInput {
+
+    public TestFetchedInput(InputAttemptIdentifier inputAttemptIdentifier) {
+      super(Type.MEMORY, -1, -1, inputAttemptIdentifier, null);
+    }
+
+    @Override
+    public OutputStream getOutputStream() throws IOException {
+      return null;
+    }
+
+    @Override
+    public InputStream getInputStream() throws IOException {
+      return null;
+    }
+
+    @Override
+    public void commit() throws IOException {
+    }
+
+    @Override
+    public void abort() throws IOException {
+    }
+
+    @Override
+    public void free() {
+    }
+  }
+}


Mime
View raw message