tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-707. Add a LocalContainerLauncher. Contributed by Chen He.
Date Wed, 30 Jul 2014 05:15:10 GMT
Repository: tez
Updated Branches:
  refs/heads/master 282917344 -> e49ed3397


TEZ-707. Add a LocalContainerLauncher. Contributed by Chen He.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/e49ed339
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/e49ed339
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/e49ed339

Branch: refs/heads/master
Commit: e49ed3397cefe80700be58c3739d3d8d98d40c04
Parents: 2829173
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Jul 29 22:14:42 2014 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Jul 29 22:14:42 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  17 +-
 .../app/launcher/LocalContainerLauncher.java    | 276 +++++++++++++++++++
 .../org/apache/tez/runtime/task/TezChild.java   |  83 ++++--
 3 files changed, 345 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/e49ed339/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4e55376..8f4bbab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -45,6 +45,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -60,6 +61,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -130,6 +132,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
 import org.apache.tez.dag.app.launcher.ContainerLauncher;
 import org.apache.tez.dag.app.launcher.ContainerLauncherImpl;
+import org.apache.tez.dag.app.launcher.LocalContainerLauncher;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
 import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
@@ -218,6 +221,8 @@ public class DAGAppMaster extends AbstractService {
   private final Map<String, LocalResource> sessionResources =
     new HashMap<String, LocalResource>();
 
+  private boolean isLocal = false; //Local mode flag
+
   private DAGAppMasterShutdownHandler shutdownHandler =
       new DAGAppMasterShutdownHandler();
 
@@ -297,6 +302,12 @@ public class DAGAppMaster extends AbstractService {
     isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts;
 
     this.amConf = conf;
+    this.isLocal = conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
+        TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
+    if (isLocal) {
+       UserGroupInformation.setConfiguration(conf);
+       appMasterUgi = UserGroupInformation.getCurrentUser();
+    }
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
     dispatcher = createDispatcher();
@@ -797,7 +808,11 @@ public class DAGAppMaster extends AbstractService {
 
   protected ContainerLauncher
       createContainerLauncher(final AppContext context) {
-    return new ContainerLauncherImpl(context);
+    if(isLocal){
+      return new LocalContainerLauncher(context, taskAttemptListener);
+    } else {
+      return new ContainerLauncherImpl(context);
+    }
   }
 
   public ApplicationId getAppID() {

http://git-wip-us.apache.org/repos/asf/tez/blob/e49ed339/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
new file mode 100644
index 0000000..9ff9a07
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+
+import java.io.IOException;
+import java.util.Random;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Clock;
+
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
+import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
+import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
+import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed;
+import org.apache.tez.dag.app.rm.container.AMContainerEventType;
+import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.runtime.task.TezChild;
+
+
+/**
+ * Runs the container task locally in a thread.
+ * Since all (sub)tasks share the same local directory, they must be executed
+ * sequentially in order to avoid creating/deleting the same files/dirs.
+ */
+public class LocalContainerLauncher extends AbstractService implements
+  ContainerLauncher {
+
+  private static final Log LOG = LogFactory.getLog(LocalContainerLauncher.class);
+  private final AppContext context;
+  private TaskAttemptListener taskAttemptListener;
+  private BlockingQueue<NMCommunicatorEvent> eventQueue =
+    new LinkedBlockingQueue<NMCommunicatorEvent>();
+  private static AtomicBoolean serviceStopped;
+
+  private Clock clock;
+  private LinkedBlockingQueue<Runnable> taskQueue =
+    new LinkedBlockingQueue<Runnable>();
+
+  private ThreadPoolExecutor taskExecutor;
+  private ListeningExecutorService listeningExecutorService;
+  private int poolSize;
+  private final Random sleepTime = new Random();
+
+  public LocalContainerLauncher(AppContext context,
+                                TaskAttemptListener taskAttemptListener) {
+    super(LocalContainerLauncher.class.getName());
+    this.context = context;
+    this.taskAttemptListener = taskAttemptListener;
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    Thread eventHandlingThread = new Thread(new TezSubTaskRunner(),
+      "LocalContainerLauncher-SubTaskRunner");
+    eventHandlingThread.start();
+    super.serviceStart();
+  }
+
+  @Override
+  public synchronized void serviceInit(Configuration config) {
+    serviceStopped = new AtomicBoolean(false);
+    this.clock = context.getClock();
+    this.poolSize = config.getInt(TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS,
+      TezConfiguration.TEZ_AM_INLINE_TASK_EXECUTION_MAX_TASKS_DEFAULT);
+    int maxPoolSize = poolSize;
+
+    this.taskExecutor = new ThreadPoolExecutor(poolSize, maxPoolSize, 60*1000,
+      TimeUnit.SECONDS, taskQueue);
+    this.listeningExecutorService = MoreExecutors.listeningDecorator(taskExecutor);
+  }
+
+  @SuppressWarnings("unchecked")
+  void sendContainerLaunchFailedMsg(ContainerId containerId, String message) {
+    LOG.error(message);
+    context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, message));
+  }
+
+  //should mimic container using threads
+  //need to start all MapProcessor and RedProcessor here
+  private class TezSubTaskRunner implements Runnable {
+
+    ListenableFuture<Object> runningTask;
+    TezSubTaskRunner() {}
+
+    //launch tasks
+    private void launch(NMCommunicatorEvent event) {
+      NMCommunicatorLaunchRequestEvent launchEv = (NMCommunicatorLaunchRequestEvent)event;
+
+      String containerIdStr = event.getContainerId().toString();
+      String host = taskAttemptListener.getAddress().getAddress().getHostAddress();
+      int port = taskAttemptListener.getAddress().getPort();
+      String tokenIdentifier = context.getApplicationID().toString();
+
+      String[] localDirs =
+        StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS.name()));
+
+      try {
+        runningTask = listeningExecutorService.submit(createSubTask(context.getAMConf(),
+          host, port, containerIdStr, tokenIdentifier, context.getApplicationAttemptId().getAttemptId(),
+          localDirs, (TezTaskUmbilicalProtocol) taskAttemptListener));
+        Futures.addCallback(runningTask,
+          new FutureCallback<Object>() {
+            @Override
+            public void onSuccess(Object result) {
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+              LOG.error("Container launching failed", t);
+            }
+          }
+          , taskExecutor);
+      } catch (Throwable throwable) {
+        LOG.info("Failed to start runSubTask thread!", throwable);
+        sendContainerLaunchFailedMsg(event.getContainerId(), "Container Launching Failed!");
+      }
+
+      try{
+        context.getEventHandler().handle(
+          new AMContainerEventLaunched(launchEv.getContainerId()));
+        ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
+          event.getContainerId(), clock.getTime(), context.getApplicationAttemptId());
+        context.getHistoryHandler().handle(new DAGHistoryEvent(context.getCurrentDAGID(),lEvt));
+      } catch (Throwable t) {
+        String message = "Container launch failed for " +  event.getContainerId() + " : "
+
+          StringUtils.stringifyException(t);
+        t.printStackTrace();
+        LOG.error(message);
+        context.getEventHandler().handle(new AMContainerEventLaunchFailed(event.getContainerId(),
message));
+      }
+    }
+
+    private void stop(NMCommunicatorEvent event) {
+      try{
+        context.getEventHandler().handle(
+          new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT));
+      } catch (Throwable t) {
+        // ignore the cleanup failure
+        String message = "cleanup failed for container " +  event.getContainerId() + " :
" +
+          StringUtils.stringifyException(t);
+        context.getEventHandler().handle(
+          new AMContainerEventStopFailed(event.getContainerId(), message));
+        LOG.warn(message);
+      }
+    }
+
+    @Override
+    public void run() {
+      NMCommunicatorEvent event;
+      while (!Thread.currentThread().isInterrupted()) {
+        while (taskExecutor.getActiveCount() >= poolSize){
+          try {
+            LOG.info("Number of Running Tasks reach the uppper bound, sleep 1 seconds!:"
+
+              taskExecutor.getActiveCount());
+            Thread.sleep(1000 + sleepTime.nextInt(10) * 1000);
+          } catch (InterruptedException e) {
+            LOG.warn("Thread Sleep has been interrupted!", e);
+          }
+        }
+
+        try {
+          event = eventQueue.take();
+        } catch (InterruptedException e) {  // mostly via T_KILL? JOB_KILL?
+          LOG.error("Returning, interrupted : ", e);
+          return;
+        }
+
+        LOG.info("Processing the event " + event.toString());
+        if (event.getType() == NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST) {
+          launch(event);
+        } else if (event.getType() == NMCommunicatorEventType.CONTAINER_STOP_REQUEST) {
+          stop(event);
+        } else {
+          LOG.warn("Ignoring unexpected event " + event.toString());
+        }
+      }
+    }
+  } //end SubTaskRunner
+
+  //create a SubTask
+  private synchronized Callable<Object> createSubTask(final Configuration defaultConf,
final String host,
+      final int port, final String containerId, final String tokenIdentifier, final int attemptNumber,
+      final String[] localDirs, final TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol)
{
+    return new Callable<Object>() {
+      @Override
+      public Object call() {
+        // Pull in configuration specified for the session.
+        try {
+          TezChild tezChild;
+          tezChild = TezChild.newTezChild(defaultConf, host, port, containerId, tokenIdentifier,
+            attemptNumber, localDirs);
+          tezChild.setUmbilical(tezTaskUmbilicalProtocol);
+          tezChild.run();
+        } catch (TezException e) {
+          //need to report the TezException and stop this task
+          LOG.error("Failed to add User Specified TezConfiguration!", e);
+        } catch (IOException e) {
+          //need to report the IOException and stop this task
+          LOG.error("IOE in launching task!", e);
+        } catch (InterruptedException e) {
+          //need to report the IOException and stop this task
+          LOG.error("Interruption happened during launching task!", e);
+        }
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    if (taskExecutor != null) {
+      taskExecutor.shutdownNow();
+    }
+
+    if (listeningExecutorService != null) {
+      listeningExecutorService.shutdownNow();
+    }
+
+    serviceStopped.set(true);
+    super.serviceStop();
+  }
+
+  @Override
+  public void handle(NMCommunicatorEvent event) {
+    try {
+      eventQueue.put(event);
+    } catch (InterruptedException e) {
+      throw new TezUncheckedException(e);  // FIXME? YarnRuntimeException is "for runtime
exceptions only"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/e49ed339/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
index 131def0..5debca7 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -90,6 +90,7 @@ public class TezChild {
   private final int amHeartbeatInterval;
   private final long sendCounterInterval;
   private final int maxEventsToGet;
+  private final boolean isLocal;
 
   private final ListeningExecutorService executor;
   private final ObjectRegistryImpl objectRegistry;
@@ -104,7 +105,8 @@ public class TezChild {
 
   public TezChild(Configuration conf, String host, int port, String containerIdentifier,
       String tokenIdentifier, int appAttemptNumber, String[] localDirs,
-      ObjectRegistryImpl objectRegistry) throws IOException, InterruptedException {
+      ObjectRegistryImpl objectRegistry)
+      throws IOException, InterruptedException {
     this.defaultConf = conf;
     this.containerIdString = containerIdentifier;
     this.appAttemptNumber = appAttemptNumber;
@@ -141,6 +143,8 @@ public class TezChild {
       }
     }
 
+    this.isLocal = defaultConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
+        TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
     UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(tokenIdentifier);
     Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials);
     SecurityUtil.setTokenService(jobToken, address);
@@ -149,16 +153,18 @@ public class TezChild {
     serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,
         ShuffleUtils.convertJobTokenToBytes(jobToken));
 
-    umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>()
{
-      @Override
-      public TezTaskUmbilicalProtocol run() throws Exception {
-        return (TezTaskUmbilicalProtocol) RPC.getProxy(TezTaskUmbilicalProtocol.class,
-            TezTaskUmbilicalProtocol.versionID, address, defaultConf);
-      }
-    });
+    if (!isLocal) {
+      umbilical = taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>()
{
+        @Override
+        public TezTaskUmbilicalProtocol run() throws Exception {
+          return RPC.getProxy(TezTaskUmbilicalProtocol.class,
+              TezTaskUmbilicalProtocol.versionID, address, defaultConf);
+        }
+      });
+    }
   }
   
-  void run() throws IOException, InterruptedException, TezException {
+  public void run() throws IOException, InterruptedException, TezException {
 
     ContainerContext containerContext = new ContainerContext(containerIdString);
     ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext,
@@ -205,7 +211,7 @@ public class TezChild {
         TezTaskRunner taskRunner = new TezTaskRunner(new TezConfiguration(defaultConf), childUGI,
             localDirs, containerTask.getTaskSpec(), umbilical, appAttemptNumber,
             serviceConsumerMetadata, startedInputsMap, taskReporter, executor, objectRegistry);
-        boolean shouldDie = false;
+        boolean shouldDie;
         try {
           shouldDie = !taskRunner.run();
           if (shouldDie) {
@@ -233,12 +239,12 @@ public class TezChild {
    *          the new task specification. Must be a valid task
    * @param childUGI
    *          the old UGI instance being used
-   * @return
+   * @return childUGI
    */
   UserGroupInformation handleNewTaskCredentials(ContainerTask containerTask,
       UserGroupInformation childUGI) {
     // Re-use the UGI only if the Credentials have not changed.
-    Preconditions.checkState(containerTask.shouldDie() != true);
+    Preconditions.checkState(!containerTask.shouldDie());
     Preconditions.checkState(containerTask.getTaskSpec() != null);
     if (containerTask.haveCredentialsChanged()) {
       LOG.info("Refreshing UGI since Credentials have changed");
@@ -294,7 +300,7 @@ public class TezChild {
    *          the new task specification. Must be a valid task
    */
   private void cleanupOnTaskChanged(ContainerTask containerTask) {
-    Preconditions.checkState(containerTask.shouldDie() != true);
+    Preconditions.checkState(!containerTask.shouldDie());
     Preconditions.checkState(containerTask.getTaskSpec() != null);
     TezVertexID newVertexID = containerTask.getTaskSpec().getTaskAttemptID().getTaskID()
         .getVertexID();
@@ -317,30 +323,29 @@ public class TezChild {
     }
     RPC.stopProxy(umbilical);
     DefaultMetricsSystem.shutdown();
-    LogManager.shutdown();
+    if (!isLocal) {
+      LogManager.shutdown();
+    }
   }
 
-  public static void main(String[] args) throws IOException, InterruptedException, TezException
{
-    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
-    LOG.info("TezChild starting");
+  public void setUmbilical(TezTaskUmbilicalProtocol tezTaskUmbilicalProtocol){
+    if(tezTaskUmbilicalProtocol != null){
+      this.umbilical = tezTaskUmbilicalProtocol;
+    }
+  }
+
+  public static TezChild newTezChild(Configuration conf, String host, int port, String containerIdentifier,
+      String tokenIdentifier, int attemptNumber, String[] localDirs)
+      throws IOException, InterruptedException, TezException {
 
-    final Configuration defaultConf = new Configuration();
     // Pull in configuration specified for the session.
     // TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file
     // for each and every task, and reading it back from disk. Also needs to be per vertex.
-    TezUtils.addUserSpecifiedTezConfiguration(defaultConf);
-    UserGroupInformation.setConfiguration(defaultConf);
-    Limits.setConfiguration(defaultConf);
+    TezUtils.addUserSpecifiedTezConfiguration(conf);
+    UserGroupInformation.setConfiguration(conf);
+    Limits.setConfiguration(conf);
 
-    assert args.length == 5;
-    String host = args[0];
-    int port = Integer.parseInt(args[1]);
-    final String containerIdentifier = args[2];
-    final String tokenIdentifier = args[3];
-    final int attemptNumber = Integer.parseInt(args[4]);
     final String pid = System.getenv().get("JVM_PID");
-    final String[] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
-        .name()));
     LOG.info("PID, containerIdentifier:  " + pid + ", " + containerIdentifier);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Info from cmd line: AM-host: " + host + " AM-port: " + port
@@ -355,9 +360,27 @@ public class TezChild {
     // singleton of ObjectRegistry for this JVM
     ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
 
-    TezChild tezChild = new TezChild(defaultConf, host, port, containerIdentifier, tokenIdentifier,
+    return new TezChild(conf, host, port, containerIdentifier, tokenIdentifier,
         attemptNumber, localDirs, objectRegistry);
+  }
+
+  public static void main(String[] args) throws IOException, InterruptedException, TezException
{
+
+    final Configuration defaultConf = new Configuration();
+
+    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    LOG.info("TezChild starting");
 
+    assert args.length == 5;
+    String host = args[0];
+    int port = Integer.parseInt(args[1]);
+    final String containerIdentifier = args[2];
+    final String tokenIdentifier = args[3];
+    final int attemptNumber = Integer.parseInt(args[4]);
+    final String[] localDirs = StringUtils.getTrimmedStrings(System.getenv(Environment.LOCAL_DIRS
+        .name()));
+    TezChild tezChild = newTezChild(defaultConf, host, port, containerIdentifier,
+        tokenIdentifier, attemptNumber, localDirs);
     tezChild.run();
   }
 


Mime
View raw message