tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeag...@apache.org
Subject [47/50] [abbrv] tez git commit: Merge branch 'TEZ-3334' into TEZ-3334-MERGE1
Date Wed, 24 May 2017 21:08:16 GMT
Merge branch 'TEZ-3334' into TEZ-3334-MERGE1


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/251ca1c3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/251ca1c3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/251ca1c3

Branch: refs/heads/master
Commit: 251ca1c3600f5ecbcf65eb8f367988c44c86a97e
Parents: 651257f 8e85c46
Author: Jonathan Eagles <jeagles@yahoo-inc.com>
Authored: Fri May 19 14:56:37 2017 -0500
Committer: Jonathan Eagles <jeagles@yahoo-inc.com>
Committed: Fri May 19 14:56:37 2017 -0500

----------------------------------------------------------------------
 TEZ-3334-CHANGES.txt                            |  2 +
 .../apache/tez/common/DagContainerLauncher.java | 43 ++++++++++++++++++++
 .../app/launcher/ContainerLauncherWrapper.java  |  8 ++--
 .../tez/dag/app/launcher/DagDeleteRunnable.java | 29 +++++++++----
 .../tez/dag/app/launcher/DeletionTracker.java   |  6 +--
 .../dag/app/launcher/DeletionTrackerImpl.java   | 24 +++++++----
 .../app/launcher/LocalContainerLauncher.java    | 25 ++++--------
 .../app/launcher/TezContainerLauncherImpl.java  | 12 +++---
 .../app/rm/container/AMContainerHelpers.java    |  4 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  6 ++-
 .../dag/app/rm/container/AMContainerMap.java    | 11 +++--
 .../dag/app/rm/container/TestAMContainer.java   |  4 +-
 .../app/rm/container/TestAMContainerMap.java    |  4 +-
 13 files changed, 117 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/251ca1c3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
----------------------------------------------------------------------
diff --cc tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 31a5d92,f959a7c..cef1c30
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@@ -158,10 -159,8 +158,8 @@@ public class AMContainerHelpers 
      ContainerLaunchContext commonContainerSpec = null;
      synchronized (commonContainerSpecLock) {
        if (!commonContainerSpecs.containsKey(tezDAGID)) {
-         String auxiliaryService = conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
-             TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
          commonContainerSpec =
 -            createCommonContainerLaunchContext(acls, credentials, commonDAGLRs, auxiliaryService);
 +            createCommonContainerLaunchContext(acls, credentials, localResources, auxiliaryService);
          commonContainerSpecs.put(tezDAGID, commonContainerSpec);
        } else {
          commonContainerSpec = commonContainerSpecs.get(tezDAGID);

http://git-wip-us.apache.org/repos/asf/tez/blob/251ca1c3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --cc tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 18e72a7,8e7df32..02243b8
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@@ -335,19 -329,8 +336,20 @@@ public class AMContainerImpl implement
      this.schedulerId = schedulerId;
      this.launcherId = launcherId;
      this.taskCommId = taskCommId;
 -    this.stateMachine = stateMachineFactory.make(this);
+     this.auxiliaryService = auxiliaryService;
 +    this.stateMachine = new StateMachineTez<>(stateMachineFactory.make(this), this);
 +    augmentStateMachine();
 +  }
 +
 +
 +  private void augmentStateMachine() {
 +    stateMachine
 +        .registerStateEnteredCallback(AMContainerState.STOP_REQUESTED,
 +            NON_RUNNING_STATE_ENTERED_CALLBACK)
 +        .registerStateEnteredCallback(AMContainerState.STOPPING,
 +            NON_RUNNING_STATE_ENTERED_CALLBACK)
 +        .registerStateEnteredCallback(AMContainerState.COMPLETED,
 +            NON_RUNNING_STATE_ENTERED_CALLBACK);
    }
  
    @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/251ca1c3/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
----------------------------------------------------------------------
diff --cc tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
index 050ffb6,1b2fe16..15338e3
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerMap.java
@@@ -44,8 -42,8 +45,9 @@@ public class AMContainerMap extends Abs
    private final TaskCommunicatorManagerInterface tal;
    private final AppContext context;
    private final ContainerSignatureMatcher containerSignatureMatcher;
 -  private final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
 +  @VisibleForTesting
 +  final ConcurrentHashMap<ContainerId, AMContainer> containerMap;
+   private String auxiliaryService;
  
    public AMContainerMap(ContainerHeartbeatHandler chh, TaskCommunicatorManagerInterface
tal,
        ContainerSignatureMatcher containerSignatureMatcher, AppContext context) {
@@@ -68,23 -68,11 +72,22 @@@
    }
  
    public boolean addContainerIfNew(Container container, int schedulerId, int launcherId,
int taskCommId) {
 -    AMContainer amc = new AMContainerImpl(container, chh, tal,
 -      containerSignatureMatcher, context, schedulerId, launcherId, taskCommId, auxiliaryService);
 +    AMContainer amc = createAmContainer(container, chh, tal,
-         containerSignatureMatcher, context, schedulerId, launcherId, taskCommId);
- 
++        containerSignatureMatcher, context, schedulerId, launcherId, taskCommId, auxiliaryService);
      return (containerMap.putIfAbsent(container.getId(), amc) == null);
    }
  
 +  AMContainer createAmContainer(Container container,
 +                                ContainerHeartbeatHandler chh,
 +                                TaskCommunicatorManagerInterface tal,
 +                                ContainerSignatureMatcher signatureMatcher,
 +                                AppContext appContext, int schedulerId,
-                                 int launcherId, int taskCommId) {
++                                int launcherId, int taskCommId, String auxiliaryService)
{
 +    AMContainer amc = new AMContainerImpl(container, chh, tal,
-         signatureMatcher, appContext, schedulerId, launcherId, taskCommId);
++        signatureMatcher, appContext, schedulerId, launcherId, taskCommId, auxiliaryService);
 +    return amc;
 +  }
 +
    public AMContainer get(ContainerId containerId) {
      return containerMap.get(containerId);
    }

http://git-wip-us.apache.org/repos/asf/tez/blob/251ca1c3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --cc tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 1b9df99,65883ee..d3614d9
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@@ -63,8 -63,8 +63,9 @@@ import org.apache.hadoop.yarn.event.Eve
  import org.apache.hadoop.yarn.util.SystemClock;
  import org.apache.tez.common.security.JobTokenIdentifier;
  import org.apache.tez.common.security.TokenCache;
+ import org.apache.tez.dag.api.TezConfiguration;
  import org.apache.tez.dag.app.TaskCommunicatorWrapper;
 +import org.apache.tez.dag.app.rm.node.AMNodeEventType;
  import org.apache.tez.serviceplugins.api.ContainerEndReason;
  import org.apache.tez.serviceplugins.api.ServicePluginException;
  import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;

http://git-wip-us.apache.org/repos/asf/tez/blob/251ca1c3/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
----------------------------------------------------------------------
diff --cc tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
index efea327,2fcd0c8..d16881c
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainerMap.java
@@@ -1,4 -1,4 +1,4 @@@
--/**
++/*
   * Licensed to the Apache Software Foundation (ASF) under one
   * or more contributor license agreements.  See the NOTICE file
   * distributed with this work for additional information
@@@ -48,104 -39,35 +48,104 @@@ import org.junit.Test
  
  public class TestAMContainerMap {
  
 -  private ContainerHeartbeatHandler mockContainerHeartBeatHandler() {
 -    return mock(ContainerHeartbeatHandler.class);
 -  }
  
 -  private TaskCommunicatorManagerInterface mockTaskAttemptListener() throws ServicePluginException
{
 -    TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
 -    TaskCommunicator taskComm = mock(TaskCommunicator.class);
 -    doReturn(new InetSocketAddress("localhost", 21000)).when(taskComm).getAddress();
 -    doReturn(taskComm).when(tal).getTaskCommunicator(0);
 -    return tal;
 -  }
 +  @Test (timeout = 10000)
 +  public void testCleanupOnDagComplete() {
  
 -  private AppContext mockAppContext() {
 +    ContainerHeartbeatHandler chh = mock(ContainerHeartbeatHandler.class);
 +    TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
      AppContext appContext = mock(AppContext.class);
 -    return appContext;
 -  }
  
 -  @SuppressWarnings("deprecation")
 -  private ContainerId mockContainerId(int cId) {
 -    ApplicationId appId = ApplicationId.newInstance(1000, 1);
 -    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
 -    ContainerId containerId = ContainerId.newInstance(appAttemptId, cId);
 -    return containerId;
 +
 +
 +    int numContainers = 7;
 +    WrappedContainer[] wContainers = new WrappedContainer[numContainers];
 +    for (int i = 0 ; i < numContainers ; i++) {
 +      WrappedContainer wc =
 +          new WrappedContainer(false, null, i);
 +      wContainers[i] = wc;
 +    }
 +
 +    AMContainerMap amContainerMap = new AMContainerMapForTest(chh, tal, mock(
 +        ContainerSignatureMatcher.class), appContext, wContainers);
 +
 +    for (int i = 0 ; i < numContainers ; i++) {
 +      amContainerMap.addContainerIfNew(wContainers[i].container, 0, 0, 0);
 +    }
 +
 +
 +    // Container 1 in LAUNCHING state
 +    wContainers[0].launchContainer();
 +    wContainers[0].verifyState(AMContainerState.LAUNCHING);
 +
 +    // Container 2 in IDLE state
 +    wContainers[1].launchContainer();
 +    wContainers[1].containerLaunched();
 +    wContainers[1].verifyState(AMContainerState.IDLE);
 +
 +    // Container 3 RUNNING state
 +    wContainers[2].launchContainer();
 +    wContainers[2].containerLaunched();
 +    wContainers[2].assignTaskAttempt(wContainers[2].taskAttemptID);
 +    wContainers[2].verifyState(AMContainerState.RUNNING);
 +
 +    // Cointainer 4 STOP_REQUESTED
 +    wContainers[3].launchContainer();
 +    wContainers[3].containerLaunched();
 +    wContainers[3].stopRequest();
 +    wContainers[3].verifyState(AMContainerState.STOP_REQUESTED);
 +
 +    // Container 5 STOPPING
 +    wContainers[4].launchContainer();
 +    wContainers[4].containerLaunched();
 +    wContainers[4].stopRequest();
 +    wContainers[4].nmStopSent();
 +    wContainers[4].verifyState(AMContainerState.STOPPING);
 +
 +    // Container 6 COMPLETED
 +    wContainers[5].launchContainer();
 +    wContainers[5].containerLaunched();
 +    wContainers[5].stopRequest();
 +    wContainers[5].nmStopSent();
 +    wContainers[5].containerCompleted();
 +    wContainers[5].verifyState(AMContainerState.COMPLETED);
 +
 +    // Container 7 STOP_REQUESTED + ERROR
 +    wContainers[6].launchContainer();
 +    wContainers[6].containerLaunched();
 +    wContainers[6].containerLaunched();
 +    assertTrue(wContainers[6].amContainer.isInErrorState());
 +    wContainers[6].verifyState(AMContainerState.STOP_REQUESTED);
 +
 +    // 7 containers present, and registered with AMContainerMap at this point.
 +
 +    assertEquals(7, amContainerMap.containerMap.size());
 +    amContainerMap.dagComplete(mock(DAG.class));
 +    assertEquals(5, amContainerMap.containerMap.size());
    }
  
 -  private Container mockContainer(ContainerId containerId) {
 -    NodeId nodeId = NodeId.newInstance("localhost", 43255);
 -    Container container = Container.newInstance(containerId, nodeId, "localhost:33333",
 -        Resource.newInstance(1024, 1), Priority.newInstance(1), mock(Token.class));
 -    return container;
 +  private static class AMContainerMapForTest extends AMContainerMap {
 +
 +
 +    private WrappedContainer[] wrappedContainers;
 +
 +    public AMContainerMapForTest(ContainerHeartbeatHandler chh,
 +                                 TaskCommunicatorManagerInterface tal,
 +                                 ContainerSignatureMatcher containerSignatureMatcher,
 +                                 AppContext context, WrappedContainer[] wrappedContainers)
{
 +      super(chh, tal, containerSignatureMatcher, context);
 +      this.wrappedContainers = wrappedContainers;
 +    }
 +
 +    @Override
 +    AMContainer createAmContainer(Container container,
 +                                  ContainerHeartbeatHandler chh,
 +                                  TaskCommunicatorManagerInterface tal,
 +                                  ContainerSignatureMatcher signatureMatcher,
 +                                  AppContext appContext, int schedulerId,
-                                   int launcherId, int taskCommId) {
++                                  int launcherId, int taskCommId, String auxiliaryService)
{
 +      return wrappedContainers[container.getId().getId()].amContainer;
 +    }
 +
    }
  }


Mime
View raw message