tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [01/16] tez git commit: TEZ-2174. Make task priority available to TaskAttemptListener. (sseth)
Date Tue, 10 Mar 2015 07:40:58 GMT
Repository: tez
Updated Branches:
  refs/heads/TEZ-2003 45e5311d9 -> a264f2f45 (forced update)


TEZ-2174. Make task priority available to TaskAttemptListener. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/d8024195
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/d8024195
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/d8024195

Branch: refs/heads/TEZ-2003
Commit: d8024195d3eb851956cc091d2e4740c1f613b43b
Parents: 09ffc24
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu Mar 5 10:50:02 2015 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu Mar 5 10:50:02 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../dag/app/rm/TaskSchedulerEventHandler.java   |  2 +-
 .../rm/container/AMContainerEventAssignTA.java  |  9 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  3 +-
 .../dag/app/rm/container/AMContainerTask.java   |  9 +-
 .../app/TestTaskAttemptListenerImplTezDag.java  |  6 +-
 .../app/rm/TestTaskSchedulerEventHandler.java   | 97 +++++++++++++++-----
 .../dag/app/rm/container/TestAMContainer.java   |  4 +-
 8 files changed, 99 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/d8024195/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3e9f959..4e113c4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2174. Make task priority available to TaskAttemptListener.
   TEZ-2169. Add NDC context to various threads and pools.
   TEZ-2171. Remove unused metrics code.
   TEZ-2001. Support pipelined data transfer for ordered output.

http://git-wip-us.apache.org/repos/asf/tez/blob/d8024195/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 05cbc66..a240f55 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -434,7 +434,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
     sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
         event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
-            .getContainerContext().getCredentials()));
+            .getContainerContext().getCredentials(), event.getPriority()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/d8024195/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
index a363168..682cd02 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
@@ -32,14 +32,17 @@ public class AMContainerEventAssignTA extends AMContainerEvent {
   private final TaskSpec remoteTaskSpec;
   private final Map<String, LocalResource> taskLocalResources;
   private final Credentials credentials;
+  private final int priority;
 
   public AMContainerEventAssignTA(ContainerId containerId, TezTaskAttemptID attemptId,
-      Object remoteTaskSpec, Map<String, LocalResource> taskLocalResources, Credentials
credentials) {
+      Object remoteTaskSpec, Map<String, LocalResource> taskLocalResources, Credentials
credentials,
+      int priority) {
     super(containerId, AMContainerEventType.C_ASSIGN_TA);
     this.attemptId = attemptId;
     this.remoteTaskSpec = (TaskSpec) remoteTaskSpec;
     this.taskLocalResources = taskLocalResources;
     this.credentials = credentials;
+    this.priority = priority;
   }
 
   public TaskSpec getRemoteTaskSpec() {
@@ -57,4 +60,8 @@ public class AMContainerEventAssignTA extends AMContainerEvent {
   public Credentials getCredentials() {
     return this.credentials;
   }
+
+  public int getPriority() {
+    return priority;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d8024195/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index f72e62a..201ae9f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -565,7 +565,8 @@ public class AMContainerImpl implements AMContainer {
           "] to container: [" + container.getContainerId() + "]");
       AMContainerTask amContainerTask = new AMContainerTask(
           event.getRemoteTaskSpec(), container.additionalLocalResources,
-          container.credentialsChanged ? container.credentials : null, container.credentialsChanged);
+          container.credentialsChanged ? container.credentials : null, container.credentialsChanged,
+          event.getPriority());
       container.registerAttemptWithListener(amContainerTask);
       container.additionalLocalResources = null;
       container.credentialsChanged = false;

http://git-wip-us.apache.org/repos/asf/tez/blob/d8024195/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
index 89a434b..7b22ba6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
@@ -30,14 +30,17 @@ public class AMContainerTask {
   private final TaskSpec tezTask;
   private final Credentials credentials;
   private final boolean credentialsChanged;
+  private final int priority;
 
   public AMContainerTask(TaskSpec tezTask,
-      Map<String, LocalResource> additionalResources, Credentials credentials, boolean
credentialsChanged) {
+                         Map<String, LocalResource> additionalResources, Credentials
credentials,
+                         boolean credentialsChanged, int priority) {
     Preconditions.checkNotNull(tezTask, "TaskSpec cannot be null");
     this.tezTask = tezTask;
     this.additionalResources = additionalResources;
     this.credentials = credentials;
     this.credentialsChanged = credentialsChanged;
+    this.priority = priority;
   }
 
   public TaskSpec getTask() {
@@ -55,4 +58,8 @@ public class AMContainerTask {
   public boolean haveCredentialsChanged() {
     return this.credentialsChanged;
   }
+
+  public int getPriority() {
+    return priority;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/d8024195/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index f0f7dc5..1f5d9bb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -64,7 +64,7 @@ public class TestTaskAttemptListenerImplTezDag {
     TaskSpec taskSpec = mock(TaskSpec.class);
     TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class);
     doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
-    AMContainerTask amContainerTask = new AMContainerTask(taskSpec, null, null, false);
+    AMContainerTask amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0);
     ContainerTask containerTask = null;
 
 
@@ -106,7 +106,7 @@ public class TestTaskAttemptListenerImplTezDag {
     TaskSpec taskSpec2 = mock(TaskSpec.class);
     TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
     doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
-    AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false);
+    AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
     taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3);
     taskAttemptListener.unregisterRunningContainer(containerId3);
     containerTask = taskAttemptListener.getTask(containerContext3);
@@ -134,7 +134,7 @@ public class TestTaskAttemptListenerImplTezDag {
     TaskSpec taskSpec = mock(TaskSpec.class);
     TezTaskAttemptID taskAttemptId = mock(TezTaskAttemptID.class);
     doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
-    AMContainerTask amContainerTask = new AMContainerTask(taskSpec, null, null, false);
+    AMContainerTask amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0);
     ContainerTask containerTask = null;
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/d8024195/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 7bb9d9b..28f94a7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.app.rm;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
@@ -28,14 +30,19 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
@@ -44,13 +51,16 @@ import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.client.DAGClientServer;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
 import org.apache.tez.dag.app.dag.impl.TaskImpl;
 import org.apache.tez.dag.app.dag.impl.VertexImpl;
 import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
 import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.AMContainerState;
 import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
 import org.apache.tez.dag.app.web.WebUIService;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@@ -122,7 +132,46 @@ public class TestTaskSchedulerEventHandler {
     schedulerHandler = new MockTaskSchedulerEventHandler(
         mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService);
   }
-  
+
+  @Test(timeout = 5000)
+  public void testSimpleAllocate() throws Exception {
+    Configuration conf = new Configuration(false);
+    schedulerHandler.init(conf);
+    schedulerHandler.start();
+
+    TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class);
+    TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class);
+    when(mockAttemptId.getId()).thenReturn(0);
+    when(mockTaskAttempt.getID()).thenReturn(mockAttemptId);
+    Resource resource = Resource.newInstance(1024, 1);
+    ContainerContext containerContext =
+        new ContainerContext(new HashMap<String, LocalResource>(), new Credentials(),
+            new HashMap<String, String>(), "");
+    int priority = 10;
+    TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(new HashSet<String>(),
null);
+
+    ContainerId mockCId = mock(ContainerId.class);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(mockCId);
+
+    AMContainer mockAMContainer = mock(AMContainer.class);
+    when(mockAMContainer.getContainerId()).thenReturn(mockCId);
+    when(mockAMContainer.getState()).thenReturn(AMContainerState.IDLE);
+
+    when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
+
+    AMSchedulerEventTALaunchRequest lr =
+        new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt,
locHint,
+            priority, containerContext);
+    schedulerHandler.taskAllocated(mockTaskAttempt, lr, container);
+    assertEquals(2, mockEventHandler.events.size());
+    assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA);
+    AMContainerEventAssignTA assignEvent =
+        (AMContainerEventAssignTA) mockEventHandler.events.get(1);
+    assertEquals(priority, assignEvent.getPriority());
+    assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
+  }
+
   @Test (timeout = 5000)
   public void testTaskBasedAffinity() throws Exception {
     Configuration conf = new Configuration(false);
@@ -179,15 +228,15 @@ public class TestTaskSchedulerEventHandler {
     when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
     when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.PREEMPTED);
     schedulerHandler.containerCompleted(mockTask, mockStatus);
-    Assert.assertEquals(1, mockEventHandler.events.size());
+    assertEquals(1, mockEventHandler.events.size());
     Event event = mockEventHandler.events.get(0);
-    Assert.assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
+    assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
     AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
-    Assert.assertEquals(mockCId, completedEvent.getContainerId());
-    Assert.assertEquals("Container preempted externally. Container preempted by RM.", 
+    assertEquals(mockCId, completedEvent.getContainerId());
+    assertEquals("Container preempted externally. Container preempted by RM.",
         completedEvent.getDiagnostics());
-    Assert.assertTrue(completedEvent.isPreempted());
-    Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
+    assertTrue(completedEvent.isPreempted());
+    assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
         completedEvent.getTerminationCause());
     Assert.assertFalse(completedEvent.isDiskFailed());
 
@@ -205,15 +254,15 @@ public class TestTaskSchedulerEventHandler {
     verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any());
     schedulerHandler.preemptContainer(mockCId);
     verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId);
-    Assert.assertEquals(1, mockEventHandler.events.size());
+    assertEquals(1, mockEventHandler.events.size());
     Event event = mockEventHandler.events.get(0);
-    Assert.assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
+    assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
     AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
-    Assert.assertEquals(mockCId, completedEvent.getContainerId());
-    Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics());
-    Assert.assertTrue(completedEvent.isPreempted());
+    assertEquals(mockCId, completedEvent.getContainerId());
+    assertEquals("Container preempted internally", completedEvent.getDiagnostics());
+    assertTrue(completedEvent.isPreempted());
     Assert.assertFalse(completedEvent.isDiskFailed());
-    Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+    assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
         completedEvent.getTerminationCause());
 
     schedulerHandler.stop();
@@ -237,16 +286,16 @@ public class TestTaskSchedulerEventHandler {
     when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
     when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.DISKS_FAILED);
     schedulerHandler.containerCompleted(mockTask, mockStatus);
-    Assert.assertEquals(1, mockEventHandler.events.size());
+    assertEquals(1, mockEventHandler.events.size());
     Event event = mockEventHandler.events.get(0);
-    Assert.assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
+    assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
     AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
-    Assert.assertEquals(mockCId, completedEvent.getContainerId());
-    Assert.assertEquals("Container disk failed. NM disk failed.", 
+    assertEquals(mockCId, completedEvent.getContainerId());
+    assertEquals("Container disk failed. NM disk failed.",
         completedEvent.getDiagnostics());
     Assert.assertFalse(completedEvent.isPreempted());
-    Assert.assertTrue(completedEvent.isDiskFailed());
-    Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
+    assertTrue(completedEvent.isDiskFailed());
+    assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
         completedEvent.getTerminationCause());
 
     schedulerHandler.stop();
@@ -259,7 +308,7 @@ public class TestTaskSchedulerEventHandler {
 
     // ensure history url is empty when timeline server is not the logging class
     conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9999");
-    Assert.assertTrue("".equals(schedulerHandler.getHistoryUrl()));
+    assertTrue("".equals(schedulerHandler.getHistoryUrl()));
 
     // ensure expansion of url happens
     conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
@@ -267,24 +316,24 @@ public class TestTaskSchedulerEventHandler {
     final ApplicationId mockApplicationId = mock(ApplicationId.class);
     doReturn("TEST_APP_ID").when(mockApplicationId).toString();
     doReturn(mockApplicationId).when(mockAppContext).getApplicationID();
-    Assert.assertTrue("http://ui-host:9999/#/tez-app/TEST_APP_ID"
+    assertTrue("http://ui-host:9999/#/tez-app/TEST_APP_ID"
         .equals(schedulerHandler.getHistoryUrl()));
 
     // ensure the trailing / in history url is handled
     conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9998/");
-    Assert.assertTrue("http://ui-host:9998/#/tez-app/TEST_APP_ID"
+    assertTrue("http://ui-host:9998/#/tez-app/TEST_APP_ID"
         .equals(schedulerHandler.getHistoryUrl()));
 
     // handle bad template ex without begining /
     conf.set(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
         "__HISTORY_URL_BASE__#/somepath");
-    Assert.assertTrue("http://ui-host:9998/#/somepath"
+    assertTrue("http://ui-host:9998/#/somepath"
         .equals(schedulerHandler.getHistoryUrl()));
 
     conf.set(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
         "__HISTORY_URL_BASE__?viewPath=tez-app/__APPLICATION_ID__");
     conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://localhost/ui/tez");
-    Assert.assertTrue("http://localhost/ui/tez?viewPath=tez-app/TEST_APP_ID"
+    assertTrue("http://localhost/ui/tez?viewPath=tez-app/TEST_APP_ID"
         .equals(schedulerHandler.getHistoryUrl()));
 
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/d8024195/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 22c0559..fafbba6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -124,6 +124,7 @@ public class TestAMContainer {
     verify(wc.tal, times(1)).registerTaskAttempt(argumentCaptor.capture(), eq(wc.containerID));
     assertEquals(1, argumentCaptor.getAllValues().size());
     assertEquals(wc.taskAttemptID, argumentCaptor.getAllValues().get(0).getTask().getTaskAttemptID());
+    assertEquals(WrappedContainer.taskPriority, argumentCaptor.getAllValues().get(0).getPriority());
 
     // Attempt succeeded
     wc.taskAttemptSucceeded(wc.taskAttemptID);
@@ -1158,6 +1159,7 @@ public class TestAMContainer {
   private static class WrappedContainer {
 
     long rmIdentifier = 2000;
+    static final int taskPriority = 10;
     ApplicationId applicationID;
     ApplicationAttemptId appAttemptID;
     ContainerId containerID;
@@ -1288,7 +1290,7 @@ public class TestAMContainer {
       reset(eventHandler);
       doReturn(taID).when(taskSpec).getTaskAttemptID();
       amContainer.handle(new AMContainerEventAssignTA(containerID, taID, taskSpec,
-          additionalResources, credentials));
+          additionalResources, credentials, taskPriority));
     }
 
     public void containerLaunched() {


Mime
View raw message