tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [12/12] tez git commit: TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration. (sseth)
Date Thu, 12 Feb 2015 07:27:58 GMT
TEZ-2019. Temporarily allow the scheduler and launcher to be specified
via configuration. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a72dba84
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a72dba84
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a72dba84

Branch: refs/heads/TEZ-2003
Commit: a72dba847911585eddabf420a7e877fa2bdf68fd
Parents: 54bd104
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Jan 30 16:02:32 2015 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Feb 11 22:59:00 2015 -0800

----------------------------------------------------------------------
 TEZ-2003-CHANGES.txt                            |  4 +++
 .../apache/tez/dag/api/TezConfiguration.java    |  6 ++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 30 ++++++++++++++++-
 .../dag/app/rm/TaskSchedulerEventHandler.java   | 34 ++++++++++++++++++--
 .../org/apache/tez/runtime/task/TezChild.java   |  3 +-
 5 files changed, 73 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a72dba84/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
new file mode 100644
index 0000000..1822fcb
--- /dev/null
+++ b/TEZ-2003-CHANGES.txt
@@ -0,0 +1,4 @@
+ALL CHANGES:
+  TEZ-2019. Temporarily allow the scheduler and launcher to be specified via configuration.
+
+INCOMPATIBLE CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/a72dba84/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index cfd6426..df233e1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1136,6 +1136,12 @@ public class TezConfiguration extends Configuration {
       + "tez-ui.webservice.enable";
   public static final boolean TEZ_AM_WEBSERVICE_ENABLE_DEFAULT = true;
 
+  @ConfigurationScope(Scope.VERTEX)
+  public static final String TEZ_AM_CONTAINER_LAUNCHER_CLASS = TEZ_AM_PREFIX + "container-launcher.class";
+  @ConfigurationScope(Scope.VERTEX)
+  public static final String TEZ_AM_TASK_SCHEDULER_CLASS = TEZ_AM_PREFIX + "task-scheduler.class";
+
+
   // TODO only validate property here, value can also be validated if necessary
   public static void validateProperty(String property, Scope usedScope) {
     Scope validScope = PropertyScope.get(property);

http://git-wip-us.apache.org/repos/asf/tez/blob/a72dba84/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 8fd5626..9f523ac 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -27,6 +27,8 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -91,6 +93,7 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezConverterUtils;
 import org.apache.tez.common.TezUtilsInternal;
@@ -938,9 +941,34 @@ public class DAGAppMaster extends AbstractService {
   protected ContainerLauncher
       createContainerLauncher(final AppContext context) throws UnknownHostException {
     if(isLocal){
+      LOG.info("Creating LocalContainerLauncher");
       return new LocalContainerLauncher(context, taskAttemptListener, workingDirectory);
     } else {
-      return new ContainerLauncherImpl(context);
+      // TODO: Temporary reflection with specific parameters until a clean interface is defined.
+      String containerLauncherClassName = getConfig().get(TezConfiguration.TEZ_AM_CONTAINER_LAUNCHER_CLASS);
+      if (containerLauncherClassName == null) {
+        LOG.info("Creating Default Container Launcher");
+        return new ContainerLauncherImpl(context);
+      } else {
+        LOG.info("Creating container launcher : " + containerLauncherClassName);
+        Class<? extends ContainerLauncher> containerLauncherClazz = (Class<? extends
ContainerLauncher>) ReflectionUtils.getClazz(
+            containerLauncherClassName);
+        try {
+          Constructor<? extends ContainerLauncher> ctor = containerLauncherClazz
+              .getConstructor(AppContext.class, Configuration.class, TaskAttemptListener.class);
+          ctor.setAccessible(true);
+          ContainerLauncher instance = ctor.newInstance(context, getConfig(), taskAttemptListener);
+          return instance;
+        } catch (NoSuchMethodException e) {
+          throw new TezUncheckedException(e);
+        } catch (InvocationTargetException e) {
+          throw new TezUncheckedException(e);
+        } catch (InstantiationException e) {
+          throw new TezUncheckedException(e);
+        } catch (IllegalAccessException e) {
+          throw new TezUncheckedException(e);
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/a72dba84/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 4fd655e..72dcd95 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.app.rm;
 
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -42,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.TaskLocationHint;
@@ -329,12 +332,39 @@ public class TaskSchedulerEventHandler extends AbstractService
     boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
         TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
     if (isLocal) {
+      LOG.info("Using TaskScheduler: LocalTaskSchedulerService");
       return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
           host, port, trackingUrl, appContext);
     }
     else {
-      return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
-          host, port, trackingUrl, appContext);
+      String schedulerClassName = getConfig().get(TezConfiguration.TEZ_AM_TASK_SCHEDULER_CLASS);
+      if (schedulerClassName == null) {
+        LOG.info("Using TaskScheduler: YarnTaskSchedulerService");
+        return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
+            host, port, trackingUrl, appContext);
+      } else {
+        LOG.info("Using custom TaskScheduler: " + schedulerClassName);
+        // TODO Temporary reflection with specific parameters. Remove once there is a clean
interface.
+        Class<? extends TaskSchedulerService> taskSchedulerClazz =
+            (Class<? extends TaskSchedulerService>) ReflectionUtils.getClazz(schedulerClassName);
+        try {
+          Constructor<? extends TaskSchedulerService> ctor = taskSchedulerClazz
+              .getConstructor(TaskSchedulerAppCallback.class, AppContext.class, String.class,
+                  Integer.class, String.class, Configuration.class);
+          ctor.setAccessible(true);
+          TaskSchedulerService taskSchedulerService =
+              ctor.newInstance(this, appContext, host, port, trackingUrl, getConfig());
+          return taskSchedulerService;
+        } catch (NoSuchMethodException e) {
+          throw new TezUncheckedException(e);
+        } catch (InvocationTargetException e) {
+          throw new TezUncheckedException(e);
+        } catch (InstantiationException e) {
+          throw new TezUncheckedException(e);
+        } catch (IllegalAccessException e) {
+          throw new TezUncheckedException(e);
+        }
+      }
     }
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/a72dba84/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
index d537846..6164e52 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -356,7 +356,8 @@ public class TezChild {
       DefaultMetricsSystem.shutdown();
       if (!isLocal) {
         RPC.stopProxy(umbilical);
-        LogManager.shutdown();
+        // TODO Temporary change. Revert. Ideally, move this over to the main method in TezChild
if possible.
+//        LogManager.shutdown();
       }
     }
   }


Mime
View raw message