tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-572. Implement more efficient re-use for vertices that have no locality requirements. (hitesh)
Date Tue, 22 Oct 2013 19:45:02 GMT
Updated Branches:
  refs/heads/master a6908afa8 -> d8adaf3b9


TEZ-572. Implement more efficient re-use for vertices that have no locality requirements.
(hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d8adaf3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d8adaf3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d8adaf3b

Branch: refs/heads/master
Commit: d8adaf3b942f10c2b2bf981977ce6c87ea9472ab
Parents: a6908af
Author: Hitesh Shah <hitesh@apache.org>
Authored: Tue Oct 22 12:43:51 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Tue Oct 22 12:43:51 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/dag/api/TezConfiguration.java    |  4 +-
 .../tez/dag/app/rm/TezAMRMClientAsync.java      | 72 ++++++++++++++++++--
 .../tez/dag/app/rm/TestContainerReuse.java      |  6 +-
 3 files changed, 69 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d8adaf3b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 47b18b6..0ba8f44 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -226,10 +226,8 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_LIB_URIS =
       TEZ_PREFIX + "lib.uris";
 
-  public static final String TEZ_APPLICATION_TYPE = "TEZ-MR*";
+  public static final String TEZ_APPLICATION_TYPE = "TEZ";
 
-  public static final String LOCAL_FRAMEWORK_NAME = "local-tez";
-  
   public static final String TEZ_AM_GROUPING_SPLIT_COUNT = TEZ_AM_PREFIX +
       "grouping.split-count";
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d8adaf3b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
index be2cf08..5d4e0bc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TezAMRMClientAsync.java
@@ -22,11 +22,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.TreeMultiset;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -35,7 +38,23 @@ import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 
 public class TezAMRMClientAsync<T extends ContainerRequest> extends AMRMClientAsyncImpl<T>
{
 
-  private SortedMultiset<Priority> knownPriorities = TreeMultiset.create();
+  private static final Log LOG = LogFactory.getLog(TezAMRMClientAsync.class);
+
+  /**
+   * Used to track the type of requests at a given priority.
+   */
+  private static class LocalityRequestCounter {
+    final AtomicInteger localityRequests;
+    final AtomicInteger noLocalityRequests;
+
+    public LocalityRequestCounter() {
+      this.localityRequests = new AtomicInteger(0);
+      this.noLocalityRequests = new AtomicInteger(0);
+    }
+  }
+
+  private TreeMap<Priority, LocalityRequestCounter> knownRequestsByPriority =
+    new TreeMap<Priority, LocalityRequestCounter>();
 
   public static <T extends ContainerRequest> TezAMRMClientAsync<T> createAMRMClientAsync(
       int intervalMs, CallbackHandler callbackHandler) {
@@ -56,13 +75,36 @@ public class TezAMRMClientAsync<T extends ContainerRequest> extends
AMRMClientAs
   @Override
   public synchronized void addContainerRequest(T req) {
     super.addContainerRequest(req);
-    knownPriorities.add(req.getPriority());
+    boolean hasLocality = (req.getNodes() != null && !req.getNodes().isEmpty())
+      || (req.getRacks() != null && !req.getRacks().isEmpty());
+    LocalityRequestCounter lrc = knownRequestsByPriority.get(req.getPriority());
+    if (lrc == null) {
+      lrc = new LocalityRequestCounter();
+      knownRequestsByPriority.put(req.getPriority(), lrc);
+    }
+    if (hasLocality) {
+      lrc.localityRequests.incrementAndGet();
+    } else {
+      lrc.noLocalityRequests.incrementAndGet();
+    }
   }
 
   @Override
   public synchronized void removeContainerRequest(T req) {
     super.removeContainerRequest(req);
-    knownPriorities.remove(req.getPriority());
+    boolean hasLocality = (req.getNodes() != null && !req.getNodes().isEmpty())
+      || (req.getRacks() != null && !req.getRacks().isEmpty());
+    LocalityRequestCounter lrc = knownRequestsByPriority.get(
+      req.getPriority());
+    if (hasLocality) {
+      lrc.localityRequests.decrementAndGet();
+    } else {
+      lrc.noLocalityRequests.decrementAndGet();
+    }
+    if (lrc.localityRequests.get() == 0
+        && lrc.noLocalityRequests.get() == 0) {
+      knownRequestsByPriority.remove(req.getPriority());
+    }
   }
 
   public synchronized List<? extends Collection<T>>
@@ -70,12 +112,28 @@ public class TezAMRMClientAsync<T extends ContainerRequest> extends
AMRMClientAs
         String resourceName, Resource capability) {
     // Sort based on reverse order. By default, Priority ordering is based on
     // highest numeric value being considered to be lowest priority.
-    Iterator<Priority> iter = knownPriorities.descendingMultiset().iterator();
+    Iterator<Priority> iter =
+      knownRequestsByPriority.descendingKeySet().iterator();
     if (!iter.hasNext()) {
       return Collections.emptyList();
     }
+    Priority p = iter.next();
+    LocalityRequestCounter lrc = knownRequestsByPriority.get(p);
+    if (lrc.localityRequests.get() == 0) {
+      // Fallback to ANY if there are no pending requests that require
+      // locality matching
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Over-ridding location request for matching containers as"
+          + " there are no pending requests that require locality at this"
+          + " priority"
+          + ", priority=" + p
+          + ", localityRequests=" + lrc.localityRequests
+          + ", noLocalityRequests=" + lrc.noLocalityRequests);
+      }
+      resourceName = ResourceRequest.ANY;
+    }
     List<? extends Collection<T>> matched =
-      getMatchingRequests(iter.next(), resourceName, capability);
+      getMatchingRequests(p, resourceName, capability);
     if (matched != null && !matched.isEmpty()) {
       return matched;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d8adaf3b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index ffa4228..5a5502d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -507,7 +507,7 @@ public class TestContainerReuse {
 
     Resource resource1 = Resource.newInstance(1024, 1);
     String [] emptyHosts = new String[0];
-    String [] emptyRacks = new String[0];
+    String [] racks = { "default-rack" };
 
     Priority priority = Priority.newInstance(3);
 
@@ -518,14 +518,14 @@ public class TestContainerReuse {
     TaskAttempt ta11 = mock(TaskAttempt.class);
     doReturn(vertexID).when(ta11).getVertexID();
     AMSchedulerEventTALaunchRequest lrEvent11 = createLaunchRequestEvent(
-      taID11, ta11, resource1, emptyHosts, emptyRacks, priority);
+      taID11, ta11, resource1, emptyHosts, racks, priority);
 
     //Vertex1, Task2, Attempt 1,  no locality information.
     TezTaskAttemptID taID12 = new TezTaskAttemptID(new TezTaskID(vertexID, 2), 1);
     TaskAttempt ta12 = mock(TaskAttempt.class);
     doReturn(vertexID).when(ta12).getVertexID();
     AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(
-      taID12, ta12, resource1, emptyHosts, emptyRacks, priority);
+      taID12, ta12, resource1, emptyHosts, racks, priority);
 
     // Send launch request for task 1 only, deterministic assignment to this task.
     taskSchedulerEventHandler.handleEvent(lrEvent11);


Mime
View raw message