tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject svn commit: r1470087 - in /incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app: ./ dag/impl/ rm/ rm/container/
Date Fri, 19 Apr 2013 23:58:10 GMT
Author: bikas
Date: Fri Apr 19 23:58:09 2013
New Revision: 1470087

URL: http://svn.apache.org/r1470087
Log:
TEZ-54. Remove FIXME's and link them to TEZ jiras (bikas)

Modified:
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java?rev=1470087&r1=1470086&r2=1470087&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
Fri Apr 19 23:58:09 2013
@@ -24,8 +24,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.locks.Lock;
@@ -76,8 +74,6 @@ import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
-import org.apache.tez.dag.api.records.AMInfo;
 import org.apache.tez.dag.app.client.ClientService;
 import org.apache.tez.dag.app.client.impl.TezClientService;
 import org.apache.tez.dag.app.dag.DAG;
@@ -165,7 +161,6 @@ public class DAGAppMaster extends Compos
   //protected final MRAppMetrics metrics;
   // TODO Recovery
   //private Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun;
-  private List<AMInfo> amInfos;
   private AppContext context;
   private Dispatcher dispatcher;
   private ClientService clientService;
@@ -181,7 +176,7 @@ public class DAGAppMaster extends Compos
       new JobTokenSecretManager();
   // TODODAGAM Define DAGID
   private TezDAGID dagId;
-//  private boolean newApiCommitter;
+  //  private boolean newApiCommitter;
   private DagEventDispatcher dagEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
   private AbstractService stagingDirCleanerService;
@@ -191,10 +186,6 @@ public class DAGAppMaster extends Compos
 
   private DAGLocationHint dagLocationHint;
 
-  // FIXME need to remove requestor and allocator
-  // private ContainerRequestor containerRequestor;
-  // private ContainerAllocator amScheduler;
-
 
   private DAG dag;
   private Credentials fsTokens = new Credentials(); // Filled during init
@@ -296,7 +287,7 @@ public class DAGAppMaster extends Compos
     taskCleaner = createTaskCleaner(context);
     addIfService(taskCleaner);
 
-    // FIXME TODO DAGClient
+    // TODO TEZ-9
     //service to handle requests from JobClient
     clientService = new TezClientService();
     addIfService(clientService);
@@ -320,7 +311,7 @@ public class DAGAppMaster extends Compos
         new TaskAttemptEventDispatcher());
     dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
 
-    // FIXME handle speculation
+    // TODO TEZ-14
     // speculator = createSpeculator(conf, context);
     // addIfService(speculator);
     speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
@@ -334,18 +325,6 @@ public class DAGAppMaster extends Compos
     addIfService(containerLauncher);
     dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
 
-    // service to allocate containers from RM (if non-uber) or to fake it (uber)
-    /*
-    // FIXME remove requestor and allocator
-    containerRequestor = createContainerRequestor(clientService, context);
-    addIfService(containerRequestor);
-    dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
-
-    amScheduler = createAMScheduler(containerRequestor, context);
-    addIfService(amScheduler);
-    dispatcher.register(AMSchedulerEventType.class, amScheduler);
-    */
-
     taskSchedulerEventHandler = new TaskSchedulerEventHandler(context,
         clientService);
     addIfService(taskSchedulerEventHandler);
@@ -848,10 +827,6 @@ public class DAGAppMaster extends Compos
   }
   */
 
-  public List<AMInfo> getAllAMInfos() {
-    return amInfos;
-  }
-
   public ContainerLauncher getContainerLauncher() {
     return containerLauncher;
   }
@@ -1118,16 +1093,7 @@ public class DAGAppMaster extends Compos
       amInfos = recoveryServ.getAMInfos();
     }
     */
-
-    // / Create the AMInfo for the current AppMaster
-    if (amInfos == null) {
-      amInfos = new LinkedList<AMInfo>();
-    }
-    AMInfo amInfo =
-        TezBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
-            nmPort, nmHttpPort);
-    amInfos.add(amInfo);
-
+    
     // /////////////////// Create the job itself.
     dag = createDAG(getConfig());
 

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java?rev=1470087&r1=1470086&r2=1470087&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
Fri Apr 19 23:58:09 2013
@@ -303,7 +303,7 @@ public class TaskAttemptImpl implements 
   TezTask createRemoteTask() {
     Vertex vertex = getTask().getVertex();
 
-    // FIXME user and jobname
+    // TODO  TEZ-50 user and jobname
     return new TezEngineTask(getID(), "user", "jobname", getTask()
         .getVertex().getName(), mrxModuleClassName,
         vertex.getInputSpecList(), vertex.getOutputSpecList());

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java?rev=1470087&r1=1470086&r2=1470087&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
Fri Apr 19 23:58:09 2013
@@ -288,7 +288,7 @@ public class TaskImpl implements Task, E
     readLock = readWriteLock.readLock();
     writeLock = readWriteLock.writeLock();
     this.attempts = Collections.emptyMap();
-    // FIXME get from conf or API
+    // TODO TEZ-47 get from conf or API
     maxAttempts = 4;
     taskId = new TezTaskID(vertexId, partition);
     this.partition = partition;
@@ -616,7 +616,6 @@ public class TaskImpl implements Task, E
   }
 
   TaskAttemptImpl createAttempt(int attemptNumber) {
-    // FIXME TODODAGAM - implement.
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
         taskAttemptListener, null, 0, conf, committer,
         jobToken, credentials, clock, taskHeartbeatHandler,

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1470087&r1=1470086&r2=1470087&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
Fri Apr 19 23:58:09 2013
@@ -393,7 +393,7 @@ public class VertexImpl implements org.a
 
   @Override
   public DAGConfiguration getConf() {
-    // FIXME this should be renamed as it is giving global DAG conf
+    // TODO TEZ-24 this should be renamed as it is giving global DAG conf
     // we need a function to give user-land configuration for this vertex
     return conf;
   }
@@ -679,8 +679,8 @@ public class VertexImpl implements org.a
   }
 
   static VertexState checkVertexCompleteSuccess(VertexImpl vertex) {
-    // FIXME this vertex is definitely buggy as completed includes killed/failed
-    // check for vertex success
+    // TODO TEZ-39 this vertex is definitely buggy as completed includes
+    // killed/failed check for vertex success
     if (vertex.completedTaskCount == vertex.tasks.size()) {
       if (vertex.failedTaskCount > 0) {
         try {
@@ -760,7 +760,7 @@ public class VertexImpl implements org.a
 
         checkTaskLimits();
 
-        // FIXME should depend on source num tasks
+        // TODO should depend on source num tasks
         vertex.sourceTaskAttemptCompletionEvents =
             new ArrayList<TezDependentTaskCompletionEvent>(vertex.numTasks + 10);
 
@@ -769,13 +769,9 @@ public class VertexImpl implements org.a
         
         
 
-        // FIXME this only works if all edges are bipartite
         boolean hasBipartite = false;
         if (vertex.sourceVertices != null) {
           for (EdgeProperty edgeProperty : vertex.sourceVertices.values()) {
-            // FIXME The init needs to be in
-            // topo sort order of graph or else source may not be initialized.
-            // Also should not depend on assumption of single-threaded dispatcher
             if(edgeProperty.getConnectionPattern() == ConnectionPattern.BIPARTITE) {
               hasBipartite = true;
               break;
@@ -787,18 +783,20 @@ public class VertexImpl implements org.a
           // setup vertex scheduler
           // TODO this needs to consider data size and perhaps API. 
           // Currently implicitly BIPARTITE is the only edge type
+          // TODO TEZ-40 config from TeztConfiguration          
           vertex.vertexScheduler = new 
               BipartiteSlowStartVertexScheduler(vertex,
-                                                0.5f,  // FIXME get from config
-                                                0.8f); // FIXME get from config
+                                                0.5f,
+                                                0.8f);
         } else {
           // schedule all tasks upon vertex start
           vertex.vertexScheduler = new ImmediateStartVertexScheduler(vertex);
         }
 
         // FIXME how do we decide vertex needs a committer?
+        // Answer: Do commit for every vertex
         // for now, only for leaf vertices
-        // FIXME make commmitter type configurable per vertex
+        // TODO TEZ-41 make commmitter type configurable per vertex
         if (vertex.targetVertices.isEmpty()) {
           vertex.committer = new MRVertexOutputCommitter();
         }
@@ -1027,7 +1025,7 @@ public class VertexImpl implements org.a
       //eventId is equal to index in the arraylist
       tce.setEventId(vertex.sourceTaskAttemptCompletionEvents.size());
       vertex.sourceTaskAttemptCompletionEvents.add(tce);
-      // FIXME this needs to be ordered/grouped by source vertices or else 
+      // TODO this needs to be ordered/grouped by source vertices or else 
       // my tasks will not know which events are for which vertices' tasks. This 
       // differentiation was not needed for MR because there was only 1 M stage.
       // if the tce is sent to the task then a solution could be to add vertex 
@@ -1059,7 +1057,7 @@ public class VertexImpl implements org.a
       TezDependentTaskCompletionEvent tce =
         ((VertexEventTaskAttemptCompleted) event).getCompletionEvent();
 
-      // FIXME this should only be sent for successful events? looks like all 
+      // TODO this should only be sent for successful events? looks like all 
       // need to be sent in the existing shuffle code
       // Notify all target vertices
       if (vertex.targetVertices != null) {
@@ -1113,7 +1111,7 @@ public class VertexImpl implements org.a
 
     @Override
     public VertexState transition(VertexImpl vertex, VertexEvent event) {
-      vertex.completedTaskCount++;//FIXME this is a bug
+      vertex.completedTaskCount++;// TEZ-39 this is a bug
       LOG.info("Num completed Tasks: " + vertex.completedTaskCount);
       VertexEventTaskCompleted taskEvent = (VertexEventTaskCompleted) event;
       Task task = vertex.tasks.get(taskEvent.getTaskID());

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java?rev=1470087&r1=1470086&r2=1470087&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
Fri Apr 19 23:58:09 2013
@@ -188,7 +188,7 @@ public class TaskScheduler extends Abstr
     // upcall to app outside of locks
     AppFinalStatus status = appClient.getFinalAppStatus();
     try {
-      // FIXME make this optional for the reboot case
+      // TODO TEZ-36 dont unregister automatically after reboot sent by RM
       synchronized (this) {
         amRmClient.unregisterApplicationMaster(status.exitStatus, 
                                                status.exitMessage,
@@ -426,7 +426,7 @@ public class TaskScheduler extends Abstr
   }
   
   private void addTaskRequest(Object task, ContainerRequest<CRCookie> request) {
-    // FIXME duplicates
+    // TODO TEZ-37 fix duplicate handling
     taskRequests.put(task, request);
     amRmClient.addContainerRequest(request);
   }

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java?rev=1470087&r1=1470086&r2=1470087&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
Fri Apr 19 23:58:09 2013
@@ -487,9 +487,7 @@ public class TaskSchedulerEventHandler e
   public synchronized void appRebootRequested() {
     // This can happen if the RM has been restarted. If it is in that state,
     // this application must clean itself up.
-    // TODO change event to REBOOT
-    // FIXME appReboot != dagReboot
-    // TODO handle multiple dags - Answer: this is shared across dags
+    // TODO TEZ-34 change event to reboot and send to app master
     sendEvent(new DAGEvent(appContext.getDAGID(),
                            DAGEventType.INTERNAL_ERROR));
     throw new YarnException("ResourceManager requests reboot for: "
@@ -545,7 +543,7 @@ public class TaskSchedulerEventHandler e
 
   @Override
   public void onError(Exception e) {
-    // TODO Possibly wait for some time and then stop
+    // TODO TEZ-35 handle error
   }
 
 }

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java?rev=1470087&r1=1470086&r2=1470087&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
(original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
Fri Apr 19 23:58:09 2013
@@ -41,7 +41,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -90,7 +89,7 @@ public class AMContainerHelpers {
   private static ContainerLaunchContext createCommonContainerLaunchContext(
       Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
       Token<JobTokenIdentifier> jobToken,
-      ApplicationId appId, TezVertexID vertexId, Credentials credentials) {
+      TezVertexID vertexId, Credentials credentials) {
 
     // Application resources
     Map<String, LocalResource> localResources =
@@ -145,10 +144,9 @@ public class AMContainerHelpers {
     return container;
   }
 
-  // FIXME does CLC need to work based off DAG id or App Id?
   @VisibleForTesting
   public static ContainerLaunchContext createContainerLaunchContext(
-      Map<ApplicationAccessType, String> applicationACLs,
+      Map<ApplicationAccessType, String> acls,
       ContainerId containerId, JobConf jobConf, TezVertexID vertexId,
       Token<JobTokenIdentifier> jobToken,
       Resource assignedCapability, Map<String, LocalResource> localResources,
@@ -159,9 +157,7 @@ public class AMContainerHelpers {
     synchronized (commonContainerSpecLock) {
       if (commonContainerSpec == null) {
         commonContainerSpec = createCommonContainerLaunchContext(
-            applicationACLs, jobConf, jobToken,
-            vertexId.getDAGId().getApplicationId(),
-            vertexId, credentials);
+            acls, jobConf, jobToken, vertexId, credentials);
       }
     }
 
@@ -173,12 +169,11 @@ public class AMContainerHelpers {
     lResources.putAll(localResources);
 
     // Setup environment by cloning from common env.
-    // FIXME common env is empty
-    // MRChildJVM2.setEnv should become a no-op
     Map<String, String> env = commonContainerSpec.getEnvironment();
     Map<String, String> myEnv = new HashMap<String, String>(env.size());
     myEnv.putAll(env);
     myEnv.putAll(vertexEnv);
+    // TODO TEZ-38 MRChildJVM2.setEnv should become a no-op
     MapReduceChildJVM2.setVMEnv(myEnv, jobConf, vertexId);
 
     // Set up the launch command
@@ -197,7 +192,7 @@ public class AMContainerHelpers {
     ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
         commonContainerSpec.getUser(), lResources, myEnv, commands,
         myServiceData, commonContainerSpec.getContainerTokens().duplicate(),
-        applicationACLs);
+        acls);
 
     return container;
   }



Mime
View raw message