tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2561. Port for TaskAttemptListenerImpTezDag should be configurable (zjffdu)
Date Mon, 22 Jun 2015 05:20:04 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.5 0b91397af -> 9264a598b


TEZ-2561. Port for TaskAttemptListenerImpTezDag should be configurable (zjffdu)

(cherry picked from commit 4b29ece20afed83bd5b715119a0446f02550d9be)

Conflicts:
	CHANGES.txt
	tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9264a598
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9264a598
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9264a598

Branch: refs/heads/branch-0.5
Commit: 9264a598b3f62e900110109521d278f06c7a7b08
Parents: 0b91397
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Mon Jun 22 12:45:44 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Mon Jun 22 13:19:40 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/api/TezConfiguration.java    | 11 ++-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  2 +
 .../app/TestTaskAttemptListenerImplTezDag.java  | 85 ++++++++++++++++++++
 4 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9264a598/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 34f5a9c..68cf282 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ ALL CHANGES:
   TEZ-2569. branch 0.5 build fails due to cannot find symbol TaskAttemptTerminationCause
   TEZ-1529. ATS and TezClient integration in secure kerberos enabled cluster.
   TEZ-2557. Port TEZ-1910 to branch-0.5.
+  TEZ-2561. Port for TaskAttemptListenerImpTezDag should be configurable
   TEZ-2566. Allow TaskAttemptFinishedEvent without TaskAttemptStartedEvent when it is KILLED/FAILED
   TEZ-2475. Fix a potential hang in Tez local mode caused by incorrectly handled interrupts.
   TEZ-2548. TezClient submitDAG can hang if the AM is in the process of shutting down.

http://git-wip-us.apache.org/repos/asf/tez/blob/9264a598/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index e4ca689..89fb85d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -361,12 +361,21 @@ public class TezConfiguration extends Configuration {
   
   /**
    * String value. Range of ports that the AM can use when binding for client connections.
Leave blank
-   * to use all possible ports. Expert level setting.
+   * to use all possible ports. Expert level setting. It's hadoop standard range configuration.
+   * For example 50000-50050,50100-50200
    */
   public static final String TEZ_AM_CLIENT_AM_PORT_RANGE =
       TEZ_AM_PREFIX + "client.am.port-range";
 
   /**
+   * String value. Range of ports that the AM can use when binding for task connections.
Leave blank
+   * to use all possible ports. Expert level setting. It's hadoop standard range configuration.
+   * For example 50000-50050,50100-50200
+   */
+  public static final String TEZ_AM_TASK_AM_PORT_RANGE =
+      TEZ_AM_PREFIX + "task.am.port-range";
+
+  /**
    * String value. The class to be used for DAG Scheduling. Expert level setting.
    */
   public static final String TEZ_AM_DAG_SCHEDULER_CLASS = TEZ_AM_PREFIX + "dag.scheduler.class";

http://git-wip-us.apache.org/repos/asf/tez/blob/9264a598/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index cf34e0e..3864b03 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -129,6 +129,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
             .setNumHandlers(
                 conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
                     TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
+            .setPortRangeConfig(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE)
             .setSecretManager(jobTokenSecretManager).build();
 
         // Enable service authorization?
@@ -140,6 +141,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
         server.start();
         this.address = NetUtils.getConnectAddress(server);
+        LOG.info("Instantiated TaskAttemptListener RPC at " + this.address);
       } catch (IOException e) {
         throw new TezUncheckedException(e);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/9264a598/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
new file mode 100644
index 0000000..4917fd5
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.junit.Test;
+
+public class TestTaskAttemptListenerImplTezDag {
+
+  // try 10 times to allocate random port, fail it if no one is succeed.
+  @Test(timeout = 5000)
+  public void testPortRange() {
+    boolean succeedToAllocate = false;
+    Random rand = new Random();
+    for (int i = 0; i < 10; ++i) {
+      int nextPort = 1024 + rand.nextInt(65535 - 1024);
+      if (testPortRange(nextPort)) {
+        succeedToAllocate = true;
+        break;
+      }
+    }
+    if (!succeedToAllocate) {
+      fail("Can not allocate free port even in 10 iterations for TaskAttemptListener");
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testPortRange_NotSpecified() {
+    Configuration conf = new Configuration();
+    TaskAttemptListenerImpTezDag taskAttemptListener = new TaskAttemptListenerImpTezDag(
+        mock(AppContext.class), mock(TaskHeartbeatHandler.class),
+        mock(ContainerHeartbeatHandler.class), null);
+    // no exception happen, should started properly
+    taskAttemptListener.init(conf);
+    taskAttemptListener.start();
+  }
+
+  private boolean testPortRange(int port) {
+    boolean succeedToAllocate = true;
+    TaskAttemptListenerImpTezDag taskAttemptListener = null;
+    try {
+      Configuration conf = new Configuration();
+      conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
+      taskAttemptListener = new TaskAttemptListenerImpTezDag(
+          mock(AppContext.class), mock(TaskHeartbeatHandler.class),
+          mock(ContainerHeartbeatHandler.class), null);
+      taskAttemptListener.init(conf);
+      taskAttemptListener.start();
+      int resultedPort = taskAttemptListener.getAddress().getPort();
+      assertEquals(port, resultedPort);
+    } catch (Exception e) {
+      succeedToAllocate = false;
+    } finally {
+      if (taskAttemptListener != null) {
+        try {
+          taskAttemptListener.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+          fail("fail to stop TaskAttemptListener");
+        }
+      }
+    }
+    return succeedToAllocate;
+  }
+}


Mime
View raw message