tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject svn commit: r1471222 - in /incubator/tez/branches/TEZ-1: tez-dag-api/src/main/java/org/apache/tez/dag/api/ tez-dag/src/main/java/org/apache/tez/dag/app/ tez-dag/src/main/java/org/apache/tez/dag/app/dag/ tez-dag/src/main/java/org/apache/tez/dag/app/dag/...
Date Wed, 24 Apr 2013 01:31:20 GMT
Author: bikas
Date: Wed Apr 24 01:31:20 2013
New Revision: 1471222

URL: http://svn.apache.org/r1471222
Log:
TEZ-24. Separate DAG conf into TezConf and DAGPlan (bikas)

Modified:
    incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
    incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
    incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java
    incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
    incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
    incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java

Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java Wed Apr 24 01:31:20 2013
@@ -18,11 +18,14 @@
 package org.apache.tez.dag.api;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 public class DAG { // FIXME rename to Topology
   List<Vertex> vertices;
-  List<Edge> edges;
+  List<Edge> edges;  
+  
+  HashMap<String, String> config = new HashMap<String, String>();
   
   public DAG() {
     this.vertices = new ArrayList<Vertex>();
@@ -58,6 +61,10 @@ public class DAG { // FIXME rename to To
     
     edges.add(edge);
   }
+  
+  public void addConfiguration(String key, String value) {
+    config.put(key, value);
+  }
 
   public void verify() throws TezException { // FIXME better exception
 
@@ -84,6 +91,8 @@ public class DAG { // FIXME rename to To
       }
     }
     
+    dagConf.setConfig(config);
+    
     return dagConf;
   }
   

Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java Wed Apr 24 01:31:20 2013
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.TreeMap;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -76,37 +77,39 @@ public class DAGConfiguration extends Co
   public final static String EDGE = DAG + "edge.";
 
   private final static String SEPARATOR = "|";
-
-  // FIXME This property onwards should be split into a separate class or the
-  // rest of thie class needs to be converted into a config name list once 
-  // the serialization is changed.
-  
-  // TODO Should not be required once all tokens are handled via AppSubmissionContext
-  public static final String JOB_SUBMIT_DIR = DAG + "jobSubmitDir";
-  public static final String APPLICATION_TOKENS_FILE = "appTokens";
-  
-  public static final String JOB_NAME = DAG + "job.name";
-  public static final String USER_NAME = DAG + "user.name";
-  
-  // TODO Some of the DAG properties are job specific and not AM specific. Rename accordingly.
   
-  // TODO Speculator class should be configurable on a pere vertex level.
-  public static final String DAG_AM_SPECULATOR_CLASS = DAG_AM + "speculator.class";
-  
-  public static final String DAG_AM_TASK_LISTENER_THREAD_COUNT = DAG_AM + "task.listener.thread-count";
-  public static final int DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
-  
-  public static final String DAG_AM_STAGING_DIR = DAG_AM + "staging-dir";
-  public static final String DAG_AM_STAGING_DIR_DEFAULT = "/tmp/hadoop-yarn/staging";
+  public final static String TEZ_DAG_CONFIG_KEYS = DAG + "keys";
+  public final static String TEZ_DAG_CONFIG_VALUES = DAG + "values";
+
+  @Private
+  public void setConfig(Map<String, String> config) {
+    if(!config.isEmpty()) {
+      String[] key = new String[config.size()];
+      String[] value = new String[config.size()];
+      int i=0;
+      for(Entry<String, String> entry : config.entrySet()) {
+        key[i] = entry.getKey();
+        value[i] = entry.getValue();
+        i++;
+      }
+      setStrings(TEZ_DAG_CONFIG_KEYS, key);
+      setStrings(TEZ_DAG_CONFIG_VALUES, value);
+    }
+  }
   
-  // TODO Are any of these node blacklisting properties required. (other than for MR compat)
-  public static final String DAG_MAX_TASK_FAILURES_PER_NODE = DAG
-      + "maxtaskfailures.per.node";
-  public static final String DAG_NODE_BLACKLISTING_ENABLED = DAG
-      + "node-blacklisting.enabled";
-  public static final String DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD = DAG
-      + "node-blacklisting.ignore-threshold-node-percent";
-  public static final int DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
+  @Private
+  public Map<String, String> getConfig() {
+    HashMap<String, String> config = new HashMap<String, String>();
+    String[] key = getStrings(TEZ_DAG_CONFIG_KEYS);
+    if(key != null) {
+      String[] value = getStrings(TEZ_DAG_CONFIG_VALUES);
+      assert value.length == key.length;
+      for(int i=0; i<key.length; ++i) {
+        config.put(key[i], value[i]);
+      }
+    }
+    return config;
+  }
   
   @Private
   public void setEdgeProperties(List<Edge> edges) {
@@ -121,7 +124,7 @@ public class DAGConfiguration extends Co
   }
 
   public Map<String, EdgeProperty> getEdgeProperties() {
-    String edgeIds[] = getStrings(TEZ_DAG_EDGES);
+    String[] edgeIds = getStrings(TEZ_DAG_EDGES);
     if (edgeIds == null) {
       return new TreeMap<String, EdgeProperty>();
     }

Modified: incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java Wed Apr 24 01:31:20 2013
@@ -23,10 +23,8 @@ import org.apache.hadoop.conf.Configurat
 public class TezConfiguration extends Configuration {
 
   public final static String TEZ_SITE_XML = "tez-site.xml";
-  public final static String TEZ_DEFAULT_XML = "tez-default.xml";
 
   static {
-    addDefaultResource(TEZ_DEFAULT_XML);
     addDefaultResource(TEZ_SITE_XML);
   }
 
@@ -39,7 +37,40 @@ public class TezConfiguration extends Co
   }
 
   public static final String TEZ_PREFIX = "tez.";
-  public static final String DAG_AM_PREFIX = TEZ_PREFIX + "dag,am.";
+  public static final String DAG_AM_PREFIX = TEZ_PREFIX + "dag.am.";
+  
+  public static final String JOB_NAME = TEZ_PREFIX + "job.name";
+  public static final String JOB_NAME_DEFAULT = "TezJob";
+  
+  public static final String USER_NAME = TEZ_PREFIX + "user.name";
+  
+  public static final String DAG_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir";
+  public static final String DAG_AM_STAGING_DIR_DEFAULT = "/tmp/hadoop-yarn/staging";
+  
+  // TODO Should not be required once all tokens are handled via AppSubmissionContext
+  public static final String JOB_SUBMIT_DIR = TEZ_PREFIX + "jobSubmitDir";
+  public static final String APPLICATION_TOKENS_FILE = "appTokens";
+  
+  public static final String DAG_AM_TASK_LISTENER_THREAD_COUNT = 
+                                TEZ_PREFIX + "task.listener.thread-count";
+  public static final int DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT = 30;
+
+  public static final String DAG_AM_CONTAINER_LISTENER_THREAD_COUNT = 
+      TEZ_PREFIX + "container.listener.thread-count";
+  public static final int DAG_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT = 30;
+  
+  // TODO Some of the DAG properties are job specific and not AM specific. Rename accordingly.
+  // TODO Are any of these node blacklisting properties required. (other than for MR compat)
+  public static final String DAG_MAX_TASK_FAILURES_PER_NODE = TEZ_PREFIX
+      + "maxtaskfailures.per.node";
+  public static final int DAG_MAX_TASK_FAILURES_PER_NODE_DEFAULT = 3;
+  public static final String DAG_NODE_BLACKLISTING_ENABLED = TEZ_PREFIX
+      + "node-blacklisting.enabled";
+  public static final boolean DAG_NODE_BLACKLISTING_ENABLED_DEFAULT = true;
+  public static final String DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD = TEZ_PREFIX
+      + "node-blacklisting.ignore-threshold-node-percent";
+  public static final int DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT = 33;
+
 
   public static final String DAG_AM_RESOURCE_MEMORY_MB = DAG_AM_PREFIX
       + "resource.memory.mb";

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java Wed Apr 24 01:31:20 2013
@@ -20,10 +20,9 @@ package org.apache.tez.dag.app;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -98,9 +97,6 @@ import org.apache.tez.dag.app.rm.contain
 import org.apache.tez.dag.app.rm.container.AMContainerState;
 import org.apache.tez.dag.app.rm.node.AMNodeEventType;
 import org.apache.tez.dag.app.rm.node.AMNodeMap;
-import org.apache.tez.dag.app.speculate.DefaultSpeculator;
-import org.apache.tez.dag.app.speculate.Speculator;
-import org.apache.tez.dag.app.speculate.SpeculatorEvent;
 import org.apache.tez.dag.app.taskclean.TaskCleaner;
 import org.apache.tez.dag.app.taskclean.TaskCleanerImpl;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
@@ -135,6 +131,7 @@ public class DAGAppMaster extends Compos
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
   private Clock clock;
+  private final DAGConfiguration dagPlan;
   private final long startTime;
   private final long appSubmitTime;
   private String appName;
@@ -150,13 +147,14 @@ public class DAGAppMaster extends Compos
   // TODO Recovery
   //private Map<TezTaskID, TaskInfo> completedTasksFromPreviousRun;
   private AppContext context;
+  private TezConfiguration conf; 
   private Dispatcher dispatcher;
   private ClientService clientService;
   // TODO Recovery
   //private Recovery recoveryServ;
   private ContainerLauncher containerLauncher;
   private TaskCleaner taskCleaner;
-  private Speculator speculator;
+  //private Speculator speculator;
   private ContainerHeartbeatHandler containerHeartbeatHandler;
   private TaskHeartbeatHandler taskHeartbeatHandler;
   private TaskAttemptListener taskAttemptListener;
@@ -169,7 +167,7 @@ public class DAGAppMaster extends Compos
   private VertexEventDispatcher vertexEventDispatcher;
   private AbstractService stagingDirCleanerService;
   private boolean inRecovery = false;
-  private SpeculatorEventDispatcher speculatorEventDispatcher;
+  //private SpeculatorEventDispatcher speculatorEventDispatcher;
   private TaskSchedulerEventHandler taskSchedulerEventHandler;
 
   private DAGLocationHint dagLocationHint;
@@ -181,15 +179,16 @@ public class DAGAppMaster extends Compos
 
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      long appSubmitTime) {
+      long appSubmitTime, DAGConfiguration dagPlan) {
     this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
-        new SystemClock(), appSubmitTime);
+        new SystemClock(), appSubmitTime, dagPlan);
   }
 
   public DAGAppMaster(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
-      Clock clock, long appSubmitTime) {
+      Clock clock, long appSubmitTime, DAGConfiguration dagPlan) {
     super(DAGAppMaster.class.getName());
+    this.dagPlan = dagPlan;
     this.clock = clock;
     this.startTime = clock.getTime();
     this.appSubmitTime = appSubmitTime;
@@ -204,18 +203,21 @@ public class DAGAppMaster extends Compos
   }
 
   @Override
-  public void init(final Configuration conf) {
+  public void init(final Configuration tezConf) {
 
+    assert tezConf instanceof TezConfiguration;
+    
+    this.conf = (TezConfiguration) tezConf;
     conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
 
     downloadTokensAndSetupUGI(conf);
-    setupDAGLocationHint(conf);
+    setupDAGLocationHint(dagPlan);
 
     context = new RunningAppContext(conf);
 
     // Job name is the same as the app name util we support DAG of jobs
     // for an app later
-    appName = conf.get(DAGConfiguration.JOB_NAME, "<missing app name>");
+    appName = dagPlan.get(TezConfiguration.JOB_NAME, "<missing app name>");
 
     dagId = new TezDAGID(appAttemptID.getApplicationId(), 1);
 
@@ -300,9 +302,9 @@ public class DAGAppMaster extends Compos
     // TODO TEZ-14
     // speculator = createSpeculator(conf, context);
     // addIfService(speculator);
-    speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
-    dispatcher.register(Speculator.EventType.class,
-        speculatorEventDispatcher);
+    //speculatorEventDispatcher = new SpeculatorEventDispatcher(conf);
+    //dispatcher.register(Speculator.EventType.class,
+    //    speculatorEventDispatcher);
 
     //    TODO XXX: Rename to NMComm
     //    corresponding service to launch allocated containers via NodeManager
@@ -615,11 +617,11 @@ public class DAGAppMaster extends Compos
   }
 
   /** Create and initialize (but don't start) a single dag. */
-  protected DAG createDAG(Configuration conf) {
+  protected DAG createDAG(DAGConfiguration dagPlan) {
 
     // create single job
     DAG newDag =
-        new DAGImpl(dagId, appAttemptID, conf, dispatcher.getEventHandler(),
+        new DAGImpl(dagId, appAttemptID, conf, dagPlan, dispatcher.getEventHandler(),
             taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
             // TODO Recovery
             //completedTasksFromPreviousRun,
@@ -641,8 +643,8 @@ public class DAGAppMaster extends Compos
    * Obtain the tokens needed by the job and put them in the UGI
    * @param conf
    */
-  protected void downloadTokensAndSetupUGI(Configuration conf) {
-
+  protected void downloadTokensAndSetupUGI(TezConfiguration conf) {
+    // TODO remove - TEZ-71
     try {
       this.currentUser = UserGroupInformation.getCurrentUser();
 
@@ -650,10 +652,10 @@ public class DAGAppMaster extends Compos
         // Read the file-system tokens from the localized tokens-file.
         Path jobSubmitDir =
             FileContext.getLocalFSFileContext().makeQualified(
-                new Path(new File(DAGConfiguration.JOB_SUBMIT_DIR)
+                new Path(new File(TezConfiguration.JOB_SUBMIT_DIR)
                     .getAbsolutePath()));
         Path jobTokenFile =
-            new Path(jobSubmitDir, DAGConfiguration.APPLICATION_TOKENS_FILE);
+            new Path(jobSubmitDir, TezConfiguration.APPLICATION_TOKENS_FILE);
         fsTokens.addAll(Credentials.readTokenStorageFile(jobTokenFile, conf));
         LOG.info("jobSubmitDir=" + jobSubmitDir + " jobTokenFile="
             + jobTokenFile);
@@ -672,7 +674,7 @@ public class DAGAppMaster extends Compos
     }
   }
 
-  protected void setupDAGLocationHint(Configuration conf) {
+  protected void setupDAGLocationHint(DAGConfiguration conf) {
     try {
       String dagLocationHintFile =
           conf.get(DAGConfiguration.DAG_LOCATION_HINT_RESOURCE_FILE,
@@ -707,39 +709,39 @@ public class DAGAppMaster extends Compos
     return new StagingDirCleaningService();
   }
 
-  protected Speculator createSpeculator(Configuration conf, AppContext context) {
-    Class<? extends Speculator> speculatorClass;
-
-    try {
-      speculatorClass
-          // "yarn.mapreduce.job.speculator.class"
-          = conf.getClass(DAGConfiguration.DAG_AM_SPECULATOR_CLASS,
-                          DefaultSpeculator.class,
-                          Speculator.class);
-      Constructor<? extends Speculator> speculatorConstructor
-          = speculatorClass.getConstructor
-               (Configuration.class, AppContext.class);
-      Speculator result = speculatorConstructor.newInstance(conf, context);
-
-      return result;
-    } catch (InstantiationException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
-      throw new YarnException(ex);
-    } catch (IllegalAccessException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
-      throw new YarnException(ex);
-    } catch (InvocationTargetException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
-      throw new YarnException(ex);
-    } catch (NoSuchMethodException ex) {
-      LOG.error("Can't make a speculator -- check "
-          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
-      throw new YarnException(ex);
-    }
-  }
+//  protected Speculator createSpeculator(Configuration conf, AppContext context) {
+//    Class<? extends Speculator> speculatorClass;
+//
+//    try {
+//      speculatorClass
+//          // "yarn.mapreduce.job.speculator.class"
+//          = conf.getClass(DAGConfiguration.DAG_AM_SPECULATOR_CLASS,
+//                          DefaultSpeculator.class,
+//                          Speculator.class);
+//      Constructor<? extends Speculator> speculatorConstructor
+//          = speculatorClass.getConstructor
+//               (Configuration.class, AppContext.class);
+//      Speculator result = speculatorConstructor.newInstance(conf, context);
+//
+//      return result;
+//    } catch (InstantiationException ex) {
+//      LOG.error("Can't make a speculator -- check "
+//          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
+//      throw new YarnException(ex);
+//    } catch (IllegalAccessException ex) {
+//      LOG.error("Can't make a speculator -- check "
+//          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
+//      throw new YarnException(ex);
+//    } catch (InvocationTargetException ex) {
+//      LOG.error("Can't make a speculator -- check "
+//          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
+//      throw new YarnException(ex);
+//    } catch (NoSuchMethodException ex) {
+//      LOG.error("Can't make a speculator -- check "
+//          + DAGConfiguration.DAG_AM_SPECULATOR_CLASS, ex);
+//      throw new YarnException(ex);
+//    }
+//  }
 
   protected TaskAttemptListener createTaskAttemptListener(AppContext context,
       TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
@@ -749,19 +751,18 @@ public class DAGAppMaster extends Compos
   }
 
   protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
-      Configuration conf) {
+      TezConfiguration conf) {
     TaskHeartbeatHandler thh = new TaskHeartbeatHandler(context, conf.getInt(
-        DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT,
-        DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
+        TezConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT,
+        TezConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
     return thh;
   }
 
   protected ContainerHeartbeatHandler createContainerHeartbeatHandler(AppContext context,
-      Configuration conf) {
+      TezConfiguration conf) {
     ContainerHeartbeatHandler chh = new ContainerHeartbeatHandler(context, conf.getInt(
-        DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT,
-        DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT));
-    // TODO XXX: Define a CONTAINER_LISTENER_THREAD_COUNT
+        TezConfiguration.DAG_AM_CONTAINER_LISTENER_THREAD_COUNT,
+        TezConfiguration.DAG_AM_CONTAINER_LISTENER_THREAD_COUNT_DEFAULT));
     return chh;
   }
 
@@ -966,13 +967,13 @@ public class DAGAppMaster extends Compos
   private class RunningAppContext implements AppContext {
 
     private DAG dag;
-    private final Configuration conf;
+    private final TezConfiguration conf;
     private final ClusterInfo clusterInfo = new ClusterInfo();
     private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     private final Lock rLock = rwLock.readLock();
     private final Lock wLock = rwLock.writeLock();
 
-    public RunningAppContext(Configuration config) {
+    public RunningAppContext(TezConfiguration config) {
       this.conf = config;
     }
 
@@ -1013,7 +1014,7 @@ public class DAGAppMaster extends Compos
 
     @Override
     public String getUser() {
-      return this.conf.get(DAGConfiguration.USER_NAME);
+      return this.conf.get(TezConfiguration.USER_NAME);
     }
 
     @Override
@@ -1081,7 +1082,7 @@ public class DAGAppMaster extends Compos
     */
     
     // /////////////////// Create the job itself.
-    dag = createDAG(getConfig());
+    dag = createDAG(dagPlan);
 
     // End of creating the job.
 
@@ -1114,19 +1115,19 @@ public class DAGAppMaster extends Compos
     // ubermode if appropriate (by registering different container-allocator
     // and container-launcher services/event-handlers).
 
-    if (dag.isUber()) {
-      speculatorEventDispatcher.disableSpeculation();
-      LOG.info("MRAppMaster uberizing job " + dag.getID()
-               + " in local container (\"uber-AM\") on node "
-               + nmHost + ":" + nmPort + ".");
-    } else {
-      // send init to speculator only for non-uber jobs.
-      // This won't yet start as dispatcher isn't started yet.
-      dispatcher.getEventHandler().handle(
-          new SpeculatorEvent(dag.getID(), clock.getTime()));
-      LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
-               + "job " + dag.getID() + ".");
-    }
+//    if (dag.isUber()) {
+//      speculatorEventDispatcher.disableSpeculation();
+//      LOG.info("MRAppMaster uberizing job " + dag.getID()
+//               + " in local container (\"uber-AM\") on node "
+//               + nmHost + ":" + nmPort + ".");
+//    } else {
+//      // send init to speculator only for non-uber jobs.
+//      // This won't yet start as dispatcher isn't started yet.
+//      dispatcher.getEventHandler().handle(
+//          new SpeculatorEvent(dag.getID(), clock.getTime()));
+//      LOG.info("MRAppMaster launching normal, non-uberized, multi-container "
+//               + "job " + dag.getID() + ".");
+//    }
 
     //start all the components
     super.start();
@@ -1197,31 +1198,31 @@ public class DAGAppMaster extends Compos
     }
   }
 
-  private class SpeculatorEventDispatcher implements
-      EventHandler<SpeculatorEvent> {
-    private final Configuration conf;
-    private volatile boolean disabled;
-
-    public SpeculatorEventDispatcher(Configuration config) {
-      this.conf = config;
-    }
-
-    @Override
-    public void handle(SpeculatorEvent event) {
-      if (disabled) {
-        return;
-      }
-
-      // FIX handle speculation events properly
-      // if vertex has speculation enabled then handle event else drop it
-      // speculator.handle(event);
-    }
-
-    public void disableSpeculation() {
-      disabled = true;
-    }
-
-  }
+//  private class SpeculatorEventDispatcher implements
+//      EventHandler<SpeculatorEvent> {
+//    private final Configuration conf;
+//    private volatile boolean disabled;
+//
+//    public SpeculatorEventDispatcher(Configuration config) {
+//      this.conf = config;
+//    }
+//
+//    @Override
+//    public void handle(SpeculatorEvent event) {
+//      if (disabled) {
+//        return;
+//      }
+//
+//      // FIX handle speculation events properly
+//      // if vertex has speculation enabled then handle event else drop it
+//      // speculator.handle(event);
+//    }
+//
+//    public void disableSpeculation() {
+//      disabled = true;
+//    }
+//
+//  }
 
   private static void validateInputParam(String value, String param)
       throws IOException {
@@ -1260,38 +1261,32 @@ public class DAGAppMaster extends Compos
           containerId.getApplicationAttemptId();
       long appSubmitTime = Long.parseLong(appSubmitTimeStr);
 
-      DAGAppMaster appMaster =
-          new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
-              Integer.parseInt(nodePortString),
-              Integer.parseInt(nodeHttpPortString), appSubmitTime);
-      ShutdownHookManager.get().addShutdownHook(
-        new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
-
       Options opts = getCliOptions();
       CommandLine cliParser = new GnuParser().parse(opts, args);
 
       // Default to running mr if nothing specified.
-      // TODO change this once the cleint is ready.
+      // TODO change this once the client is ready.
       String type;
-      DAGConfiguration dagConf = null;
+      DAGConfiguration dagPlan = null;
+      TezConfiguration conf = new TezConfiguration(new YarnConfiguration());
       if (cliParser.hasOption(OPT_PREDEFINED)) {
         LOG.info("Running with PreDefined configuration");
         type = cliParser.getOptionValue(OPT_PREDEFINED, "mr");
         LOG.info("Running job type: " + type);
 
         if (type.equals("mr")) {
-          dagConf = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMR();
+          dagPlan = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMR();
         } else if (type.equals("mrr")) {
-          dagConf = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMRR();
+          dagPlan = (DAGConfiguration)MRRExampleHelper.createDAGConfigurationForMRR();
         }
       } else {
-        dagConf = new DAGConfiguration();
-        dagConf.addResource(TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
+        dagPlan = new DAGConfiguration();
+        dagPlan.addResource(TezConfiguration.DAG_AM_PLAN_CONFIG_XML);
       }
 
       LOG.info("XXXX Running a DAG with "
-          + dagConf.getVertices().length + " vertices ");
-      for (String v : dagConf.getVertices()) {
+          + dagPlan.getVertices().length + " vertices ");
+      for (String v : dagPlan.getVertices()) {
         LOG.info("XXXX DAG has vertex " + v);
       }
 
@@ -1301,11 +1296,23 @@ public class DAGAppMaster extends Compos
       // Do not automatically close FileSystem objects so that in case of
       // SIGTERM I have a chance to write out the job history. I'll be closing
       // the objects myself.
-      dagConf.setBoolean("fs.automatic.close", false);
+      conf.setBoolean("fs.automatic.close", false);
+
+      conf.set(TezConfiguration.USER_NAME, jobUserName);
+      
+      Map<String, String> config = dagPlan.getConfig();
+      for(Entry<String, String> entry : config.entrySet()) {
+        conf.set(entry.getKey(), entry.getValue());
+      }
 
-      dagConf.set(DAGConfiguration.USER_NAME, jobUserName);
+      DAGAppMaster appMaster =
+          new DAGAppMaster(applicationAttemptId, containerId, nodeHostString,
+              Integer.parseInt(nodePortString),
+              Integer.parseInt(nodeHttpPortString), appSubmitTime, dagPlan);
+      ShutdownHookManager.get().addShutdownHook(
+        new DAGAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
 
-      initAndStartAppMaster(appMaster, new YarnConfiguration(dagConf),
+      initAndStartAppMaster(appMaster, conf,
           jobUserName);
 
     } catch (Throwable t) {
@@ -1354,7 +1361,7 @@ public class DAGAppMaster extends Compos
 
   // TODO XXX Does this really need to be a YarnConfiguration ?
   protected static void initAndStartAppMaster(final DAGAppMaster appMaster,
-      final YarnConfiguration conf, String jobUserName) throws IOException,
+      final TezConfiguration conf, String jobUserName) throws IOException,
       InterruptedException {
     UserGroupInformation.setConfiguration(conf);
     UserGroupInformation appMasterUgi = UserGroupInformation

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java Wed Apr 24 01:31:20 2013
@@ -21,11 +21,11 @@ package org.apache.tez.dag.app.dag;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.engine.records.TezDAGID;
 import org.apache.tez.engine.records.TezVertexID;
@@ -58,13 +58,10 @@ public interface DAG {
   String getUserName();
   String getQueueName();
   
-  Configuration getConf();
-  
-  /**
-   * @return a path to where the config file for this job is located.
-   */
-  Path getConfFile();
+  TezConfiguration getConf();
   
+  DAGConfiguration getDagPlan();
+
   /**
    * @return the ACLs for this job for each type of JobACL given. 
    */

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java Wed Apr 24 01:31:20 2013
@@ -56,6 +56,7 @@ import org.apache.tez.common.counters.Te
 import org.apache.tez.dag.api.DAGConfiguration;
 import org.apache.tez.dag.api.DAGLocationHint;
 import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
 import org.apache.tez.dag.app.AppContext;
@@ -133,7 +134,8 @@ public class DAGImpl implements org.apac
   private Object fullCountersLock = new Object();
   private TezCounters fullCounters = null;
 
-  public DAGConfiguration conf;
+  public final TezConfiguration conf;
+  public final DAGConfiguration dagPlan;
 
   //fields initialized in init
   private FileSystem fs;
@@ -312,7 +314,9 @@ public class DAGImpl implements org.apac
   private long finishTime;
 
   public DAGImpl(TezDAGID dagId, ApplicationAttemptId applicationAttemptId,
-      Configuration conf, EventHandler eventHandler,
+      TezConfiguration conf,
+      DAGConfiguration dagPlan,
+      EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
       JobTokenSecretManager jobTokenSecretManager,
       Credentials fsTokenCredentials, Clock clock,
@@ -327,8 +331,10 @@ public class DAGImpl implements org.apac
       DAGLocationHint dagLocationHint) {
     this.applicationAttemptId = applicationAttemptId;
     this.dagId = dagId;
-    this.dagName = conf.get(DAGConfiguration.JOB_NAME, "<missing job name>");
-    this.conf = new DAGConfiguration(conf);
+    this.dagName = conf.get(TezConfiguration.JOB_NAME,
+                             TezConfiguration.JOB_NAME_DEFAULT);
+    this.conf = conf;
+    this.dagPlan = dagPlan;
     // TODO Metrics
     //this.metrics = metrics;
     this.clock = clock;
@@ -370,10 +376,16 @@ public class DAGImpl implements org.apac
     return dagId;
   }
 
+  // TODO maybe removed after TEZ-74
   @Override
-  public DAGConfiguration getConf() {
+  public TezConfiguration getConf() {
     return conf;
   }
+  
+  @Override
+  public DAGConfiguration getDagPlan() {
+    return dagPlan;
+  }
 
   EventHandler getEventHandler() {
     return this.eventHandler;
@@ -445,12 +457,6 @@ public class DAGImpl implements org.apac
   public DAGReport getReport() {
     readLock.lock();
     try {
-      DAGState state = getState();
-
-      // jobFile can be null if the job is not yet inited.
-      String jobFile =
-          remoteJobConfFile == null ? "" : remoteJobConfFile.toString();
-
       StringBuilder diagsb = new StringBuilder();
       for (String s : getDiagnostics()) {
         diagsb.append(s).append("\n");
@@ -697,15 +703,6 @@ public class DAGImpl implements org.apac
     return queueName;
   }
 
-  /*
-   * (non-Javadoc)
-   * @see org.apache.hadoop.mapreduce.v2.app2.job.Job#getConfFile()
-   */
-  @Override
-  public Path getConfFile() {
-    return remoteJobConfFile;
-  }
-
   @Override
   public String getName() {
     return dagName;
@@ -829,14 +826,14 @@ public class DAGImpl implements org.apac
         */
 
         // create the vertices
-        String[] vertexNames = job.getConf().getVertices();
+        String[] vertexNames = job.getDagPlan().getVertices();
         job.numVertices = vertexNames.length;
         for (int i=0; i < job.numVertices; ++i) {
           VertexImpl v = createVertex(job, vertexNames[i], i);
           job.addVertex(v);
         }
 
-        job.edgeProperties = job.getConf().getEdgeProperties();
+        job.edgeProperties = job.getDagPlan().getEdgeProperties();
         
         // setup the dag
         for (Vertex v : job.vertices.values()) {
@@ -873,9 +870,9 @@ public class DAGImpl implements org.apac
 
     private void parseVertexEdges(DAGImpl dag, Vertex vertex) {
       String[] inVerticesNames =
-          dag.getConf().getInputVertices(vertex.getName());
+          dag.getDagPlan().getInputVertices(vertex.getName());
       List<String> inEdges =
-          dag.getConf().getInputEdgeIds(vertex.getName());
+          dag.getDagPlan().getInputEdgeIds(vertex.getName());
       Map<Vertex, EdgeProperty> inVertices =
           new HashMap<Vertex, EdgeProperty>();
       for (int i=0; i < inVerticesNames.length; ++i) {
@@ -886,9 +883,9 @@ public class DAGImpl implements org.apac
       vertex.setInputVertices(inVertices);
 
       String[] outVerticesNames =
-          dag.getConf().getOutputVertices(vertex.getName());
+          dag.getDagPlan().getOutputVertices(vertex.getName());
       List<String> outEdges =
-          dag.getConf().getOutputEdgeIds(vertex.getName());
+          dag.getDagPlan().getOutputEdgeIds(vertex.getName());
       Map<Vertex, EdgeProperty> outVertices =
           new HashMap<Vertex, EdgeProperty>();
       for (int i=0; i < outVerticesNames.length; ++i) {
@@ -905,6 +902,7 @@ public class DAGImpl implements org.apac
       
       dagIdString.replace("application", "job");
       
+      // TODO remove - TEZ-71
       String user =
         UserGroupInformation.getCurrentUser().getShortUserName();
       Path path = DAGApps.getStagingAreaDir(job.conf, user);
@@ -915,8 +913,6 @@ public class DAGImpl implements org.apac
       job.remoteJobSubmitDir =
           FileSystem.get(job.conf).makeQualified(
               new Path(path, dagIdString));
-      job.remoteJobConfFile =
-          new Path(job.remoteJobSubmitDir, MRJobConfig.JOB_CONF_FILE);
 
       // Prepare the TaskAttemptListener server for authentication of Containers
       // TaskAttemptListener gets the information via jobTokenSecretManager.

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java Wed Apr 24 01:31:20 2013
@@ -55,7 +55,7 @@ import org.apache.tez.common.TezEngineTa
 import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.client.impl.TezBuilderUtils;
 import org.apache.tez.dag.api.records.TaskAttemptReport;
@@ -108,7 +108,7 @@ public class TaskAttemptImpl implements 
   static final TezCounters EMPTY_COUNTERS = new TezCounters();
   private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
 
-  protected final DAGConfiguration conf;
+  protected final TezConfiguration conf;
   protected final Path jobFile;
   protected final int partition;
   @SuppressWarnings("rawtypes")
@@ -252,7 +252,7 @@ public class TaskAttemptImpl implements 
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
       TaskAttemptListener tal, Path jobFile, int partition, 
-      DAGConfiguration conf,
+      TezConfiguration conf,
       Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
       String mrxModuleClassName, TaskLocationHint locationHint,
@@ -753,19 +753,19 @@ public class TaskAttemptImpl implements 
 //    }
   }
   
-  private void maybeSendSpeculatorContainerRequired() {
-    if (!speculatorContainerRequestSent) {
-      sendEvent(new SpeculatorEvent(getID().getTaskID(), +1));
-      speculatorContainerRequestSent = true;
-    }
-  }
-
-  private void maybeSendSpeculatorContainerNoLongerRequired() {
-    if (speculatorContainerRequestSent) {
-      sendEvent(new SpeculatorEvent(getID().getTaskID(), -1));
-      speculatorContainerRequestSent = false;
-    }
-  }
+//  private void maybeSendSpeculatorContainerRequired() {
+//    if (!speculatorContainerRequestSent) {
+//      sendEvent(new SpeculatorEvent(getID().getTaskID(), +1));
+//      speculatorContainerRequestSent = true;
+//    }
+//  }
+//
+//  private void maybeSendSpeculatorContainerNoLongerRequired() {
+//    if (speculatorContainerRequestSent) {
+//      sendEvent(new SpeculatorEvent(getID().getTaskID(), -1));
+//      speculatorContainerRequestSent = false;
+//    }
+//  }
   
   private void sendTaskAttemptCleanupEvent() {
 //    TaskAttemptContext taContext = 
@@ -850,7 +850,7 @@ public class TaskAttemptImpl implements 
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
       TaskAttemptEventSchedule scheduleEvent = (TaskAttemptEventSchedule) event;
       // Event to speculator - containerNeeded++
-      ta.maybeSendSpeculatorContainerRequired();
+      //ta.maybeSendSpeculatorContainerRequired();
 
       // TODO Creating the remote task here may not be required in case of
       // recovery.
@@ -980,9 +980,9 @@ public class TaskAttemptImpl implements 
       ta.logJobHistoryAttemptStarted();
 
       // Inform the speculator about the container assignment.
-      ta.maybeSendSpeculatorContainerNoLongerRequired();
+      //ta.maybeSendSpeculatorContainerNoLongerRequired();
       // Inform speculator about startTime
-      ta.sendEvent(new SpeculatorEvent(ta.attemptId, true, ta.launchTime));
+      //ta.sendEvent(new SpeculatorEvent(ta.attemptId, true, ta.launchTime));
 
       // Inform the Task
       ta.sendEvent(new TaskEventTAUpdate(ta.attemptId,
@@ -1007,7 +1007,7 @@ public class TaskAttemptImpl implements 
       ta.sendEvent(new AMSchedulerEventTAEnded(ta, ta.containerId, helper
           .getTaskAttemptState()));
       // Decrement speculator container request.
-      ta.maybeSendSpeculatorContainerNoLongerRequired();
+      //ta.maybeSendSpeculatorContainerNoLongerRequired();
     }
   }
 
@@ -1068,7 +1068,7 @@ public class TaskAttemptImpl implements 
       ta.reportedStatus.taskState = ta.getState();
 
       // Inform speculator of status.
-      ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.clock.getTime()));
+      //ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.clock.getTime()));
 
       ta.updateProgressSplits();
 
@@ -1111,7 +1111,7 @@ public class TaskAttemptImpl implements 
 
       ta.setFinishTime();
       // Inform the speculator.
-      ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.finishTime));
+      //ta.sendEvent(new SpeculatorEvent(ta.reportedStatus, ta.finishTime));
       // Send out history event.
       ta.logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
       ta.sendEvent(createJobCounterUpdateEventSlotMillis(ta));

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java Wed Apr 24 01:31:20 2013
@@ -42,7 +42,7 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.records.TaskAttemptState;
 import org.apache.tez.dag.api.records.TaskReport;
@@ -81,7 +81,7 @@ public class TaskImpl implements Task, E
 
   private static final Log LOG = LogFactory.getLog(TaskImpl.class);
 
-  protected final DAGConfiguration conf;
+  protected final TezConfiguration conf;
   protected final Path jobFile;
   protected final int partition;
   protected final TaskAttemptListener taskAttemptListener;
@@ -264,7 +264,7 @@ public class TaskImpl implements Task, E
   }
 
   public TaskImpl(TezVertexID vertexId, int partition,
-      EventHandler eventHandler, Path remoteJobConfFile, DAGConfiguration conf,
+      EventHandler eventHandler, Path remoteJobConfFile, TezConfiguration conf,
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java Wed Apr 24 01:31:20 2013
@@ -56,6 +56,7 @@ import org.apache.tez.common.InputSpec;
 import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.EdgeProperty.ConnectionPattern;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.VertexLocationHint;
@@ -135,7 +136,7 @@ public class VertexImpl implements org.a
   private TezCounters fullCounters = null;
   private Resource taskResource;
 
-  public DAGConfiguration conf;
+  public TezConfiguration conf;
 
   //fields initialized in init
 
@@ -336,7 +337,7 @@ public class VertexImpl implements org.a
   private Map<String, String> environment;
 
   public VertexImpl(TezVertexID vertexId, String vertexName,
-      DAGConfiguration conf, EventHandler eventHandler,
+      TezConfiguration conf, EventHandler eventHandler,
       TaskAttemptListener taskAttemptListener,
       JobTokenSecretManager jobTokenSecretManager,
       Token<JobTokenIdentifier> jobToken,
@@ -369,10 +370,10 @@ public class VertexImpl implements org.a
     this.committer = new NullVertexOutputCommitter();
     this.vertexLocationHint = vertexLocationHint;
     
-    this.taskResource = conf.getVertexResource(getName());
-    this.processorName = conf.getVertexTaskModuleClassName(getName());
-    this.localResources = conf.getVertexLocalResources(getName());
-    this.environment = conf.getVertexEnv(getName());
+    this.taskResource = getDAGPlan().getVertexResource(getName());
+    this.processorName = getDAGPlan().getVertexTaskModuleClassName(getName());
+    this.localResources = getDAGPlan().getVertexLocalResources(getName());
+    this.environment = getDAGPlan().getVertexEnv(getName());
 
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.
@@ -394,9 +395,7 @@ public class VertexImpl implements org.a
   }
 
   @Override
-  public DAGConfiguration getConf() {
-    // TODO TEZ-24 this should be renamed as it is giving global DAG conf
-    // we need a function to give user-land configuration for this vertex
+  public TezConfiguration getConf() {
     return conf;
   }
 
@@ -750,7 +749,7 @@ public class VertexImpl implements org.a
 
         // TODODAGAM
         // TODO: Splits?
-        vertex.numTasks = vertex.conf.getNumVertexTasks(vertex.getName());
+        vertex.numTasks = vertex.getDAGPlan().getNumVertexTasks(vertex.getName());
         /*
         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
         job.numMapTasks = taskSplitMetaInfo.length;
@@ -824,7 +823,7 @@ public class VertexImpl implements org.a
 
     private void createTasks(VertexImpl vertex) {
       // TODO Fixme
-      DAGConfiguration conf = vertex.getConf();
+      TezConfiguration conf = vertex.getConf();
       boolean useNullLocationHint = true;
       if (vertex.vertexLocationHint != null
           && vertex.vertexLocationHint.getTaskLocationHints() != null
@@ -1272,7 +1271,7 @@ public class VertexImpl implements org.a
 
   @Override
   public TezDAGID getDAGId() {
-    return appContext.getDAGID();
+    return getDAG().getID();
   }
   
   @Override
@@ -1288,6 +1287,10 @@ public class VertexImpl implements org.a
   public DAG getDAG() {
     return appContext.getDAG();
   }
+  
+  DAGConfiguration getDAGPlan() {
+    return getDAG().getDagPlan();
+  }
 
   // TODO Eventually remove synchronization.
   @Override

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java Wed Apr 24 01:31:20 2013
@@ -68,6 +68,7 @@ public class TaskSchedulerEventHandler e
   @SuppressWarnings("rawtypes")
   private final EventHandler eventHandler;
   private final TaskScheduler taskScheduler;
+  // TODO change this to DAGAppMaster
   private DAG job;
   private Map<ApplicationAccessType, String> appAcls = null;
   private Thread eventHandlingThread;
@@ -459,7 +460,8 @@ public class TaskSchedulerEventHandler e
           containerId,
           taskAttempt.getID().getTaskID().getVertexID(),
           event.getJobToken(),
-          event.getCredentials(), false, job.getConf(),
+          // TODO getConf from AMSchedulerEventTALaunchRequest
+          event.getCredentials(), false, job.getConf(), 
           taskAttempt.getLocalResources(),
           taskAttempt.getEnvironment()));
     }

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventLaunchRequest.java Wed Apr 24 01:31:20 2013
@@ -20,11 +20,11 @@ package org.apache.tez.dag.app.rm.contai
 
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.records.TezDAGID;
 import org.apache.tez.engine.records.TezVertexID;
@@ -35,14 +35,14 @@ public class AMContainerEventLaunchReque
   private final Token<JobTokenIdentifier> jobToken;
   private final Credentials credentials;
   private final boolean shouldProfile;
-  private final Configuration conf;
+  private final TezConfiguration conf;
   private final Map<String, LocalResource> localResources;
   private final Map<String, String> environment;
 
   public AMContainerEventLaunchRequest(ContainerId containerId, 
       TezVertexID vertexId,
       Token<JobTokenIdentifier> jobToken,
-      Credentials credentials, boolean shouldProfile, Configuration conf,
+      Credentials credentials, boolean shouldProfile, TezConfiguration conf,
       Map<String, LocalResource> localResources,
       Map<String, String> environment) {
     super(containerId, AMContainerEventType.C_LAUNCH_REQUEST);
@@ -75,7 +75,7 @@ public class AMContainerEventLaunchReque
     return this.shouldProfile;
   }
 
-  public Configuration getConf() {
+  public TezConfiguration getConf() {
     return this.conf;
   }
 

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java Wed Apr 24 01:31:20 2013
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.utils.TezEngineChildJVM;
@@ -87,7 +88,7 @@ public class AMContainerHelpers {
    * @param applicationACLs
    */
   private static ContainerLaunchContext createCommonContainerLaunchContext(
-      Map<ApplicationAccessType, String> applicationACLs, Configuration conf,
+      Map<ApplicationAccessType, String> applicationACLs, TezConfiguration conf,
       Token<JobTokenIdentifier> jobToken,
       TezVertexID vertexId, Credentials credentials) {
 
@@ -138,7 +139,7 @@ public class AMContainerHelpers {
     // The null fields are per-container and will be constructed for each
     // container separately.
     ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext(
-        conf.get(DAGConfiguration.USER_NAME), localResources,
+        conf.get(TezConfiguration.USER_NAME), localResources,
         environment, null, serviceData, taskCredentialsBuffer, applicationACLs);
 
     return container;
@@ -147,7 +148,7 @@ public class AMContainerHelpers {
   @VisibleForTesting
   public static ContainerLaunchContext createContainerLaunchContext(
       Map<ApplicationAccessType, String> acls,
-      ContainerId containerId, Configuration conf, TezVertexID vertexId,
+      ContainerId containerId, TezConfiguration conf, TezVertexID vertexId,
       Token<JobTokenIdentifier> jobToken,
       Resource assignedCapability, Map<String, LocalResource> localResources,
       Map<String, String> vertexEnv,

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeMap.java Wed Apr 24 01:31:20 2013
@@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.AppContext;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -65,12 +65,14 @@ public class AMNodeMap extends AbstractS
   @Override
   public synchronized void init(Configuration conf) {
     this.maxTaskFailuresPerNode = conf.getInt(
-        DAGConfiguration.DAG_MAX_TASK_FAILURES_PER_NODE, 3);
+        TezConfiguration.DAG_MAX_TASK_FAILURES_PER_NODE, 
+        TezConfiguration.DAG_MAX_TASK_FAILURES_PER_NODE_DEFAULT);
     this.nodeBlacklistingEnabled = conf.getBoolean(
-        DAGConfiguration.DAG_NODE_BLACKLISTING_ENABLED, true);
+        TezConfiguration.DAG_NODE_BLACKLISTING_ENABLED,
+        TezConfiguration.DAG_NODE_BLACKLISTING_ENABLED_DEFAULT);
     this.blacklistDisablePercent = conf.getInt(
-          DAGConfiguration.DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD,
-          DAGConfiguration.DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
+          TezConfiguration.DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD,
+          TezConfiguration.DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD_DEFAULT);
 
     LOG.info("blacklistDisablePercent is " + blacklistDisablePercent +
         ", blacklistingEnabled: " + nodeBlacklistingEnabled + 

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/DAGApps.java Wed Apr 24 01:31:20 2013
@@ -18,16 +18,15 @@
 
 package org.apache.tez.dag.utils;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.tez.dag.api.DAGConfiguration;
+import org.apache.tez.dag.api.TezConfiguration;
 
 public class DAGApps {
 
   private static final String STAGING_CONSTANT = ".staging";
-  public static Path getStagingAreaDir(Configuration conf, String user) {
-    return new Path(conf.get(DAGConfiguration.DAG_AM_STAGING_DIR,
-        DAGConfiguration.DAG_AM_STAGING_DIR_DEFAULT)
+  public static Path getStagingAreaDir(TezConfiguration conf, String user) {
+    return new Path(conf.get(TezConfiguration.DAG_AM_STAGING_DIR,
+        TezConfiguration.DAG_AM_STAGING_DIR_DEFAULT)
         + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
   }
   

Modified: incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java (original)
+++ incubator/tez/branches/TEZ-1/tez-dag/src/main/java/org/apache/tez/dag/utils/TezEngineChildJVM.java Wed Apr 24 01:31:20 2013
@@ -71,7 +71,7 @@ public class TezEngineChildJVM {
         filter.toString();
   }
   
-  public static void setVMEnv(Map<String, String> environment, Configuration conf,
+  public static void setVMEnv(Map<String, String> environment, TezConfiguration conf,
       TezVertexID vertexId, AppContext appContext) {
 
     // FIXME this should be derivable from the container id set by the NM
@@ -81,7 +81,7 @@ public class TezEngineChildJVM {
   }
 
   public static List<String> getVMCommand(
-      InetSocketAddress taskAttemptListenerAddr, Configuration conf, 
+      InetSocketAddress taskAttemptListenerAddr, TezConfiguration conf, 
       TezVertexID vertexId, 
       ContainerId containerId, ApplicationId jobID, boolean shouldProfile) {
 

Modified: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (original)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Wed Apr 24 01:31:20 2013
@@ -25,7 +25,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.DAGConfiguration;
-
+import org.apache.tez.dag.api.TezConfiguration;
 
 
 public class DeprecatedKeys {
@@ -41,28 +41,28 @@ public class DeprecatedKeys {
     addDeprecatedKeys();
     
     mrParamToDAGParamMap.put(MRJobConfig.JOB_SUBMIT_DIR,
-        DAGConfiguration.JOB_SUBMIT_DIR);
+        TezConfiguration.JOB_SUBMIT_DIR);
     mrParamToDAGParamMap.put(MRJobConfig.APPLICATION_TOKENS_FILE,
-        DAGConfiguration.APPLICATION_TOKENS_FILE);
+        TezConfiguration.APPLICATION_TOKENS_FILE);
 
-    mrParamToDAGParamMap.put(MRJobConfig.JOB_NAME, DAGConfiguration.JOB_NAME);
+    mrParamToDAGParamMap.put(MRJobConfig.JOB_NAME, TezConfiguration.JOB_NAME);
 
-    mrParamToDAGParamMap.put(MRJobConfig.MR_AM_JOB_SPECULATOR,
-        DAGConfiguration.DAG_AM_SPECULATOR_CLASS);
+//    mrParamToDAGParamMap.put(MRJobConfig.MR_AM_JOB_SPECULATOR,
+//        TezConfiguration.DAG_AM_SPECULATOR_CLASS);
 
     // TODO Default value handling.
     mrParamToDAGParamMap.put(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
-        DAGConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT);
+        TezConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT);
     
-    mrParamToDAGParamMap.put(MRJobConfig.USER_NAME, DAGConfiguration.USER_NAME);
+    mrParamToDAGParamMap.put(MRJobConfig.USER_NAME, TezConfiguration.USER_NAME);
     
     mrParamToDAGParamMap.put(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER,
-        DAGConfiguration.DAG_MAX_TASK_FAILURES_PER_NODE);
+        TezConfiguration.DAG_MAX_TASK_FAILURES_PER_NODE);
     mrParamToDAGParamMap.put(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE,
-        DAGConfiguration.DAG_NODE_BLACKLISTING_ENABLED);
+        TezConfiguration.DAG_NODE_BLACKLISTING_ENABLED);
     mrParamToDAGParamMap.put(
         MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
-        DAGConfiguration.DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD);
+        TezConfiguration.DAG_NODE_BLACKLISTING_IGNORE_THRESHOLD);
   }
 
   // TODO TEZAM4 Sometime, make sure this gets loaded by default. Insteaf of the current initialization in MRAppMaster, TezChild.

Modified: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1471222&r1=1471221&r2=1471222&view=diff
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (original)
+++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Wed Apr 24 01:31:20 2013
@@ -682,13 +682,13 @@ public class YARNRunner implements Clien
     }
   }
 
-  private void setDAGParamsFromMRConf(DAGConfiguration dagConf) {
+  private void setDAGParamsFromMRConf(DAG dag) {
     Configuration mrConf = this.conf;
     Map<String, String> mrParamToDAGParamMap = DeprecatedKeys.getMRToDAGParamMap();
     for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
       if (mrConf.get(entry.getKey()) != null) {
         LOG.info("DEBUG: MR->DAG Setting new key: " + entry.getValue());
-        dagConf.set(entry.getValue(), mrConf.get(entry.getKey()));
+        dag.addConfiguration(entry.getValue(), mrConf.get(entry.getKey()));
       }
     }
   }
@@ -776,9 +776,10 @@ public class YARNRunner implements Clien
 
     localResources.putAll(jobLocalResources);
 
+    setDAGParamsFromMRConf(dag);
+
     // FIXME add serialized dag conf
     DAGConfiguration dagConf = dag.serializeDag();
-    setDAGParamsFromMRConf(dagConf);
     
     Path dagConfFilePath = new Path(jobSubmitDir,
         TezConfiguration.DAG_AM_PLAN_CONFIG_XML);



Mime
View raw message