tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject svn commit: r1476402 - in /incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app: ./ dag/impl/ local/ rm/
Date Fri, 26 Apr 2013 20:58:01 GMT
Author: bikas
Date: Fri Apr 26 20:58:00 2013
New Revision: 1476402

URL: http://svn.apache.org/r1476402
Log:
TEZ.85 Remove legacy RMCommunicator/Requestor/Allocator (bikas)

Removed:
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/local/
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/ContainerRequestor.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicator.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEvent.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventContainerDeAllocateRequest.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMCommunicatorEventType.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/RMContainerRequestor.java
Modified:
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java?rev=1476402&r1=1476401&r2=1476402&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Fri Apr 26 20:58:00 2013
@@ -54,13 +54,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -86,16 +84,8 @@ import org.apache.tez.dag.app.dag.event.
 import org.apache.tez.dag.app.dag.impl.DAGImpl;
 import org.apache.tez.dag.app.launcher.ContainerLauncher;
 import org.apache.tez.dag.app.launcher.ContainerLauncherImpl;
-import org.apache.tez.dag.app.local.LocalContainerRequestor;
-import org.apache.tez.dag.app.rm.AMSchedulerEvent;
 import org.apache.tez.dag.app.rm.AMSchedulerEventType;
-import org.apache.tez.dag.app.rm.ContainerAllocator;
-import org.apache.tez.dag.app.rm.ContainerRequestor;
 import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
-import org.apache.tez.dag.app.rm.RMCommunicator;
-import org.apache.tez.dag.app.rm.RMCommunicatorEvent;
-import org.apache.tez.dag.app.rm.RMContainerRequestor;
-import org.apache.tez.dag.app.rm.RMContainerRequestor.ContainerRequest;
 import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
 import org.apache.tez.dag.app.rm.container.AMContainer;
 import org.apache.tez.dag.app.rm.container.AMContainerEventType;
@@ -171,8 +161,6 @@ public class DAGAppMaster extends Compos
   //  private boolean newApiCommitter;
   private DagEventDispatcher dagEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
-  private AbstractService stagingDirCleanerService;
-  private boolean inRecovery = false;
   //private SpeculatorEventDispatcher speculatorEventDispatcher;
   private TaskSchedulerEventHandler taskSchedulerEventHandler;
 
@@ -234,30 +222,6 @@ public class DAGAppMaster extends Compos
     // TODO Committer.
     //    committer = createOutputCommitter(conf);
 
-    // TODO Recovery
-    /*
-    boolean recoveryEnabled = conf.getBoolean(
-        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
-    boolean recoverySupportedByCommitter = committer.isRecoverySupported();
-    if (recoveryEnabled && recoverySupportedByCommitter
-        && appAttemptID.getAttemptId() > 1) {
-      LOG.info("Recovery is enabled. "
-          + "Will try to recover from previous life on best effort basis.");
-      recoveryServ = createRecoveryService(context);
-      addIfService(recoveryServ);
-      dispatcher = recoveryServ.getDispatcher();
-      clock = recoveryServ.getClock();
-      inRecovery = true;
-    } else {
-      LOG.info("Not starting RecoveryService: recoveryEnabled: "
-          + recoveryEnabled + " recoverySupportedByCommitter: "
-          + recoverySupportedByCommitter + " ApplicationAttemptID: "
-          + appAttemptID.getAttemptId());
-      dispatcher = createDispatcher();
-      addIfService(dispatcher);
-    }
-    */
-
     dispatcher = createDispatcher();
     addIfService(dispatcher);
 
@@ -290,14 +254,6 @@ public class DAGAppMaster extends Compos
     clientService = new TezClientService();
     addIfService(clientService);
 
-    // TODO JobHistory
-    /*
-    //service to log job history events
-    jobHistoryEventHandler = createJobHistoryHandler(context);
-    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
-        jobHistoryEventHandler);
-     */
-
     this.dagEventDispatcher = new DagEventDispatcher();
     this.vertexEventDispatcher = new VertexEventDispatcher();
 
@@ -310,13 +266,6 @@ public class DAGAppMaster extends Compos
         new TaskAttemptEventDispatcher());
     dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
 
-    // TODO TEZ-14
-    // speculator = createSpeculator(conf, context);
-    // addIfService(speculator);
-    //speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
-    //dispatcher.register(Speculator.EventType.class,
-    //    speculatorEventDispatcher);
-
     //    TODO XXX: Rename to NMComm
     //    corresponding service to launch allocated containers via NodeManager
     //    containerLauncher = createNMCommunicator(context);
@@ -330,22 +279,6 @@ public class DAGAppMaster extends Compos
     dispatcher.register(AMSchedulerEventType.class,
         taskSchedulerEventHandler);
 
-    // Add the staging directory cleaner before the history server but after
-    // the container allocator so the staging directory is cleaned after
-    // the history has been flushed but before unregistering with the RM.
-    this.stagingDirCleanerService = createStagingDirCleaningService();
-    addService(stagingDirCleanerService);
-
-
-    // Add the JobHistoryEventHandler last so that it is properly stopped first.
-    // This will guarantee that all history-events are flushed before AM goes
-    // ahead with shutdown.
-    // Note: Even though JobHistoryEventHandler is started last, if any
-    // component creates a JobHistoryEvent in the meanwhile, it will be just be
-    // queued inside the JobHistoryEventHandler
-    // TODO JobHistory
-    //addIfService(this.jobHistoryEventHandler);
-
     super.init(conf);
   } // end of init()
 
@@ -399,32 +332,6 @@ public class DAGAppMaster extends Compos
   }
 
   /**
-   * clean up staging directories for the job.
-   * @throws IOException
-   */
-  // TODO DAG Cleanup staging directory as a user task, or a post dag plugin.
-//  public void cleanupStagingDir() throws IOException {
-//    /* make sure we clean the staging files */
-//    String jobTempDir = null;
-//    FileSystem fs = getFileSystem(getConfig());
-//    try {
-//      if (!keepJobFiles(new JobConf(getConfig()))) {
-//        jobTempDir = getConfig().get(MRJobConfig.MAPREDUCE_JOB_DIR);
-//        if (jobTempDir == null) {
-//          LOG.warn("Job Staging directory is null");
-//          return;
-//        }
-//        Path jobTempDirPath = new Path(jobTempDir);
-//        LOG.info("Deleting staging directory " + FileSystem.getDefaultUri(getConfig())
+
-//            " " + jobTempDir);
-//        fs.delete(jobTempDirPath, true);
-//      }
-//    } catch(IOException io) {
-//      LOG.error("Failed to cleanup staging dir " + jobTempDir, io);
-//    }
-//  }
-
-  /**
    * Exit call. Just in a function call to enable testing.
    */
   protected void sysexit() {
@@ -636,20 +543,6 @@ public class DAGAppMaster extends Compos
 //    return new ContainerRequestorRouter(clientService, appContext);
 //  }
 
-  /**
-   * Create the AM Scheduler.
-   *
-   * @param requestor
-   *          The Container Requestor.
-   * @param appContext
-   *          the application context.
-   * @return an instance of the AMScheduler.
-   */
-  protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
-      AppContext appContext) {
-    return new AMSchedulerRouter(requestor, appContext);
-  }
-
   /** Create and initialize (but don't start) a single dag. */
   protected DAG createDAG(DAGConfiguration dagPlan) {
 
@@ -731,52 +624,6 @@ public class DAGAppMaster extends Compos
     }
   }
 
-  // TODO JobHistory
-  /*
-  protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
-      AppContext context) {
-    return new JobHistoryEventHandler2(context, getStartCount());
-  }
-  */
-
-  protected AbstractService createStagingDirCleaningService() {
-    return new StagingDirCleaningService();
-  }
-
-//  protected Speculator createSpeculator(Configuration conf, AppContext context) {
-//    Class<? extends Speculator> speculatorClass;
-//
-//    try {
-//      speculatorClass
-//          // "yarn.mapreduce.job.speculator.class"
-//          = conf.getClass(DAGConfiguration.DAG_AM_SPECULATOR_CLASS,
-//                          DefaultSpeculator.class,
-//                          Speculator.class);
-//      Constructor<? extends Speculator> speculatorConstructor
-//          = speculatorClass.getConstructor
-//               (Configuration.class, AppContext.class);
-//      Speculator result = speculatorConstructor.newInstance(conf, context);
-//
-//      return result;
-//    } catch (InstantiationException ex) {
-//      LOG.error("Can't make a speculator -- check "
-//          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
-//      throw new YarnException(ex);
-//    } catch (IllegalAccessException ex) {
-//      LOG.error("Can't make a speculator -- check "
-//          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
-//      throw new YarnException(ex);
-//    } catch (InvocationTargetException ex) {
-//      LOG.error("Can't make a speculator -- check "
-//          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
-//      throw new YarnException(ex);
-//    } catch (NoSuchMethodException ex) {
-//      LOG.error("Can't make a speculator -- check "
-//          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
-//      throw new YarnException(ex);
-//    }
-//  }
-
   protected TaskAttemptListener createTaskAttemptListener(AppContext context,
       TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
     TaskAttemptListener lis =
@@ -841,13 +688,6 @@ public class DAGAppMaster extends Compos
     return dispatcher;
   }
 
-  // TODO Recovery
-  /*
-  public Map<TezTaskID, TaskInfo> getCompletedTaskFromPreviousRun() {
-    return completedTasksFromPreviousRun;
-  }
-  */
-
   public ContainerLauncher getContainerLauncher() {
     return containerLauncher;
   }
@@ -856,6 +696,22 @@ public class DAGAppMaster extends Compos
     return taskAttemptListener;
   }
   
+  public ContainerId getAppContainerId() {
+    return containerID;
+  }
+  
+  public String getAppNMHost() {
+    return nmHost;
+  }
+  
+  public int getAppNMPort() {
+    return nmPort;
+  }
+  
+  public int getAppNMHttpPort() {
+    return nmHttpPort;
+  }
+
   public DAGAppMasterState getState() {
     return state;
   }
@@ -889,148 +745,6 @@ public class DAGAppMaster extends Compos
     LOG.info("On DAG completion. Old state: " + oldState + " new state: " + state);
   }
 
-  /**
-   * By the time life-cycle of this router starts, job-init would have already
-   * happened.
-   */
-//  private final class ContainerRequestorRouter extends AbstractService
-//      implements ContainerRequestor {
-//    private final ClientService clientService;
-//    private final AppContext context;
-//    private ContainerRequestor real;
-//
-//    public ContainerRequestorRouter(ClientService clientService,
-//        AppContext appContext) {
-//      super(ContainerRequestorRouter.class.getName());
-//      this.clientService = clientService;
-//      this.context = appContext;
-//    }
-//
-//    @Override
-//    public void start() {
-//      if (dag.isUber()) {
-//        real = new LocalContainerRequestor(clientService,
-//            context);
-//      } else {
-//        real = new RMContainerRequestor(clientService, context);
-//      }
-//      ((Service)this.real).init(getConfig());
-//      ((Service)this.real).start();
-//      super.start();
-//    }
-//
-//    @Override
-//    public void stop() {
-//      if (real != null) {
-//        ((Service) real).stop();
-//      }
-//      super.stop();
-//    }
-//
-//    @Override
-//    public void handle(RMCommunicatorEvent event) {
-//      real.handle(event);
-//    }
-//
-//    @Override
-//    public Resource getAvailableResources() {
-//      return real.getAvailableResources();
-//    }
-//
-//    @Override
-//    public void addContainerReq(ContainerRequest req) {
-//      real.addContainerReq(req);
-//    }
-//
-//    @Override
-//    public void decContainerReq(ContainerRequest req) {
-//      real.decContainerReq(req);
-//    }
-//
-//    public void setSignalled(boolean isSignalled) {
-//      ((RMCommunicator) real).setSignalled(isSignalled);
-//    }
-//
-//    @Override
-//    public Map<ApplicationAccessType, String> getApplicationACLs() {
-//      return ((RMCommunicator)real).getApplicationAcls();
-//    }
-//  }
-
-  /**
-   * By the time life-cycle of this router starts, job-init would have already
-   * happened.
-   */
-  private final class AMSchedulerRouter extends AbstractService
-      implements ContainerAllocator {
-    private final ContainerRequestor requestor;
-    private final AppContext context;
-    private ContainerAllocator containerAllocator;
-
-    AMSchedulerRouter(ContainerRequestor requestor,
-        AppContext context) {
-      super(AMSchedulerRouter.class.getName());
-      this.requestor = requestor;
-      this.context = context;
-    }
-
-    @Override
-    public synchronized void start() {
-      // TODO LocalContainerAllocator
-      /*
-      if (job.isUber()) {
-        this.containerAllocator = new LocalContainerAllocator(this.context,
-            jobId, nmHost, nmPort, nmHttpPort, containerID,
-            (MRxTaskUmbilicalProtocol) taskAttemptListener, taskAttemptListener,
-            (RMCommunicator) this.requestor);
-      } else {
-        this.containerAllocator = new RMContainerAllocator(this.requestor,
-            this.context);
-      }
-      */
-      // TODO Fix ContainerAllocator?
-      this.containerAllocator = null;
-      //new RMContainerAllocator(this.requestor,this.context);
-
-      ((Service)this.containerAllocator).init(getConfig());
-      ((Service)this.containerAllocator).start();
-      super.start();
-    }
-
-    @Override
-    public synchronized void stop() {
-      if (containerAllocator != null) {
-        ((Service) this.containerAllocator).stop();
-        super.stop();
-      }
-    }
-
-    @Override
-    public void handle(AMSchedulerEvent event) {
-      this.containerAllocator.handle(event);
-    }
-  }
-
-  public TaskHeartbeatHandler getTaskHeartbeatHandler() {
-    return taskHeartbeatHandler;
-  }
-
-  private final class StagingDirCleaningService extends AbstractService {
-    StagingDirCleaningService() {
-      super(StagingDirCleaningService.class.getName());
-    }
-
-    @Override
-    public synchronized void stop() {
-//      try {
-//        cleanupStagingDir();
-//      } catch (IOException io) {
-//        LOG.error("Failed to cleanup staging dir: ", io);
-//      }
-      super.stop();
-    }
-  }
-
   private class RunningAppContext implements AppContext {
 
     private DAG dag;
@@ -1140,7 +854,6 @@ public class DAGAppMaster extends Compos
 
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public void start() {
 
@@ -1160,18 +873,6 @@ public class DAGAppMaster extends Compos
 
     // End of creating the job.
 
-    // TODO: JobHistory
-    // Send out an MR AM inited event for this AM and all previous AMs.
-    /*
-    for (AMInfo info : amInfos) {
-      dispatcher.getEventHandler().handle(
-          new JobHistoryEvent(job.getID(), new AMStartedEvent(info
-              .getAppAttemptId(), info.getStartTime(), info.getContainerId(),
-              info.getNodeManagerHost(), info.getNodeManagerPort(), info
-                  .getNodeManagerHttpPort())));
-    }
-    */
-
     // metrics system init is really init & start.
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("MRAppMaster");
@@ -1184,25 +885,6 @@ public class DAGAppMaster extends Compos
     dagEventDispatcher.handle(initDagEvent);
 
 
-    // JobImpl's InitTransition is done (call above is synchronous), so the
-    // "uber-decision" (MR-1220) has been made.  Query job and switch to
-    // ubermode if appropriate (by registering different container-allocator
-    // and container-launcher services/event-handlers).
-
-//    if (dag.isUber()) {
-//      speculatorEventDispatcher.disableSpeculation();
-//      LOG.info("MRAppMaster uberizing job " + dag.getID()
-//               + " in local container (\"uber-AM\") on node "
-//               + nmHost + ":" + nmPort + ".");
-//    } else {
-//      // send init to speculator only for non-uber jobs.
-//      // This won't yet start as dispatcher isn't started yet.
-//      dispatcher.getEventHandler().handle(
-//          new SpeculatorEvent(dag.getID(), clock.getTime()));
-//      LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
-//               + "job " + dag.getID() + ".");
-//    }
-
     //start all the components
     super.start();
 
@@ -1271,33 +953,7 @@ public class DAGAppMaster extends Compos
       ((EventHandler<VertexEvent>) vertex).handle(event);
     }
   }
-
-//  private class SpeculatorEventDispatcher implements
-//      EventHandler<SpeculatorEvent> {
-//    private final Configuration conf;
-//    private volatile boolean disabled;
-//
-//    public SpeculatorEventDispatcher(Configuration config) {
-//      this.conf = config;
-//    }
-//
-//    @Override
-//    public void handle(SpeculatorEvent event) {
-//      if (disabled) {
-//        return;
-//      }
-//
-//      // FIX handle speculation events properly
-//      // if vertex has speculation enabled then handle event else drop it
-//      // speculator.handle(event);
-//    }
-//
-//    public void disableSpeculation() {
-//      disabled = true;
-//    }
-//
-//  }
-
+  
   private static void validateInputParam(String value, String param)
       throws IOException {
     if (value == null) {
@@ -1410,8 +1066,8 @@ public class DAGAppMaster extends Compos
     public void run() {
       LOG.info("DAGAppMaster received a signal. Signaling TaskScheduler and "
         + "JobHistoryEventHandler.");
-      // Notify the JHEH and TaskScheduler that a SIGTERM has been received so
-      // that they don't take too long in shutting down
+      // Notify TaskScheduler that a SIGTERM has been received so
+      // that it doesnt take too long in shutting down
 
       // Signal the task scheduler.
       appMaster.taskSchedulerEventHandler.setSignalled(true);
@@ -1421,13 +1077,6 @@ public class DAGAppMaster extends Compos
         appMaster.state = DAGAppMasterState.KILLED;
       }
 
-      // TODO: JobHistory
-      /*
-      if(appMaster.jobHistoryEventHandler != null) {
-        ((JobHistoryEventHandler2) appMaster.jobHistoryEventHandler)
-            .setSignalled(true);
-      }
-      */
       appMaster.stop();
     }
   }

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java?rev=1476402&r1=1476401&r2=1476402&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
Fri Apr 26 20:58:00 2013
@@ -863,7 +863,7 @@ public class DAGImpl implements org.apac
       return new VertexImpl(
           vertexId, vertexName, dag.conf,
           dag.eventHandler, dag.taskAttemptListener,
-          dag.jobTokenSecretManager, dag.jobToken, dag.fsTokens, dag.clock,
+          dag.jobToken, dag.fsTokens, dag.clock,
           dag.taskHeartbeatHandler, dag.appContext,
           dag.dagLocationHint.getVertexLocationHint(vertexName));
     }

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1476402&r1=1476401&r2=1476402&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
Fri Apr 26 20:58:00 2013
@@ -116,8 +116,6 @@ public class VertexImpl implements org.a
   //final fields
   private final Clock clock;
 
-  // TODO: Recovery
-  //private final Map<TaskId, TaskInfo> completedTasksFromPreviousRun;
 
   private final Lock readLock;
   private final Lock writeLock;
@@ -317,7 +315,6 @@ public class VertexImpl implements org.a
 
   private Credentials fsTokens;
   private Token<JobTokenIdentifier> jobToken;
-  private JobTokenSecretManager jobTokenSecretManager;
 
   private final TezVertexID vertexId;
 
@@ -325,7 +322,6 @@ public class VertexImpl implements org.a
   private final String processorName;
 
   private Map<Vertex, EdgeProperty> sourceVertices;
-  private int sourcePhysicalEdges = 0;
   private Map<Vertex, EdgeProperty> targetVertices;
   
   private VertexScheduler vertexScheduler;
@@ -339,7 +335,6 @@ public class VertexImpl implements org.a
   public VertexImpl(TezVertexID vertexId, String vertexName,
       TezConfiguration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
-      JobTokenSecretManager jobTokenSecretManager,
       Token<JobTokenIdentifier> jobToken,
       Credentials fsTokenCredentials, Clock clock,
       // TODO: Recovery
@@ -365,7 +360,6 @@ public class VertexImpl implements org.a
     this.writeLock = readWriteLock.writeLock();
 
     this.fsTokens = fsTokenCredentials;
-    this.jobTokenSecretManager = jobTokenSecretManager;
     this.jobToken = jobToken;
     this.committer = new NullVertexOutputCommitter();
     this.vertexLocationHint = vertexLocationHint;
@@ -863,22 +857,6 @@ public class VertexImpl implements org.a
 
     }
 
-    // TODO: Splits
-    /*
-    protected TaskSplitMetaInfo[] createSplits(VertexImpl job, JobId jobId) {
-      TaskSplitMetaInfo[] allTaskSplitMetaInfo;
-      try {
-        allTaskSplitMetaInfo = SplitMetaInfoReader.readSplitMetaInfo(
-            job.oldJobId, job.fs,
-            job.conf,
-            job.remoteJobSubmitDir);
-      } catch (IOException e) {
-        throw new YarnException(e);
-      }
-      return allTaskSplitMetaInfo;
-    }
-    */
-
     /**
      * If the number of tasks are greater than the configured value
      * throw an exception that will fail job initialization



Mime
View raw message