tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1457129 [13/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Date Fri, 15 Mar 2013 21:26:48 GMT
Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,932 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.api.Task;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.ContainerTask;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+import org.apache.tez.mapreduce.task.InitialTask;
+import org.apache.tez.mapreduce.task.LocalFinalTask;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+import org.apache.tez.records.TezJobID;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.records.OutputContext;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+/** Implements MapReduce locally, in-process, for debugging. */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class LocalJobRunner implements ClientProtocol {
+  public static final Log LOG =
+    LogFactory.getLog(LocalJobRunner.class);
+
+  /** The maximum number of map tasks to run in parallel in LocalJobRunner */
+  public static final String LOCAL_MAX_MAPS =
+    "mapreduce.local.map.tasks.maximum";
+
+  private FileSystem fs;
+  private HashMap<JobID, Job> jobs = new HashMap<JobID, Job>();
+  private JobConf conf;
+  private AtomicInteger map_tasks = new AtomicInteger(0);
+  private int reduce_tasks = 0;
+  final Random rand = new Random();
+  
+  private LocalJobRunnerMetrics myMetrics = null;
+
+  private static final String jobDir =  "localRunner/";
+
+  private static final TezCounters EMPTY_COUNTERS = new TezCounters();
+
+  public long getProtocolVersion(String protocol, long clientVersion) {
+    return ClientProtocol.versionID;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(
+        this, protocol, clientVersion, clientMethodsHash);
+  }
+
+  private class Job extends Thread implements TezTaskUmbilicalProtocol {
+    // The job directory on the system: JobClient places job configurations here.
+    // This is analogous to JobTracker's system directory.
+    private Path systemJobDir;
+    private Path systemJobFile;
+    
+    // The job directory for the task.  Analagous to a task's job directory.
+    private Path localJobDir;
+    private Path localJobFile;
+
+    private JobID id;
+    private JobConf job;
+
+    private int numMapTasks;
+    private float [] partialMapProgress;
+    private TezCounters [] mapCounters;
+    private TezCounters reduceCounters;
+
+    private JobStatus status;
+    private List<TaskAttemptID> mapIds = Collections.synchronizedList(
+        new ArrayList<TaskAttemptID>());
+
+    private JobProfile profile;
+    private FileSystem localFs;
+    boolean killed = false;
+    
+    private LocalDistributedCacheManager localDistributedCacheManager;
+
+    public long getProtocolVersion(String protocol, long clientVersion) {
+      return TaskUmbilicalProtocol.versionID;
+    }
+    
+    @Override
+    public ProtocolSignature getProtocolSignature(String protocol,
+        long clientVersion, int clientMethodsHash) throws IOException {
+      return ProtocolSignature.getProtocolSignature(
+          this, protocol, clientVersion, clientMethodsHash);
+    }
+
+    public Job(JobID jobid, String jobSubmitDir) throws IOException {
+      this.systemJobDir = new Path(jobSubmitDir);
+      this.systemJobFile = new Path(systemJobDir, "job.xml");
+      this.id = jobid;
+      JobConf conf = new JobConf(systemJobFile);
+      this.localFs = FileSystem.getLocal(conf);
+      this.localJobDir = localFs.makeQualified(conf.getLocalPath(jobDir));
+      this.localJobFile = new Path(this.localJobDir, id + ".xml");
+
+      // Manage the distributed cache.  If there are files to be copied,
+      // this will trigger localFile to be re-written again.
+      localDistributedCacheManager = new LocalDistributedCacheManager();
+      localDistributedCacheManager.setup(conf);
+      
+      // Write out configuration file.  Instead of copying it from
+      // systemJobFile, we re-write it, since setup(), above, may have
+      // updated it.
+      OutputStream out = localFs.create(localJobFile);
+      try {
+        conf.writeXml(out);
+      } finally {
+        out.close();
+      }
+      this.job = new JobConf(localJobFile);
+
+      // Job (the current object) is a Thread, so we wrap its class loader.
+      if (localDistributedCacheManager.hasLocalClasspaths()) {
+        setContextClassLoader(localDistributedCacheManager.makeClassLoader(
+                getContextClassLoader()));
+      }
+      
+      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), 
+                               "http://localhost:8080/", job.getJobName());
+      status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, 
+          profile.getUser(), profile.getJobName(), profile.getJobFile(), 
+          profile.getURL().toString());
+
+      jobs.put(id, this);
+
+      this.start();
+    }
+
+    /**
+     * A Runnable instance that handles a map task to be run by an executor.
+     */
+    protected class MapTaskRunnable implements Runnable {
+      private final int taskId;
+      private final TaskSplitMetaInfo info;
+      private final JobID jobId;
+      private final JobConf localConf;
+
+      // This is a reference to a shared object passed in by the
+      // external context; this delivers state to the reducers regarding
+      // where to fetch mapper outputs.
+      private final Map<TaskAttemptID, TezTaskOutput> mapOutputFiles;
+
+      public volatile Throwable storedException;
+
+      public MapTaskRunnable(TaskSplitMetaInfo info, int taskId, JobID jobId,
+          Map<TaskAttemptID, TezTaskOutput> mapOutputFiles) {
+        this.info = info;
+        this.taskId = taskId;
+        this.mapOutputFiles = mapOutputFiles;
+        this.jobId = jobId;
+        this.localConf = new JobConf(job);
+      }
+
+      public void run() {
+        try {
+          TaskAttemptID mapId = new TaskAttemptID(new TaskID(
+              jobId, TaskType.MAP, taskId), 0);
+          LOG.info("Starting task: " + mapId);
+          final String user = 
+              UserGroupInformation.getCurrentUser().getShortUserName();
+          setupChildMapredLocalDirs(mapId, user, localConf);
+          localConf.setUser(user);
+
+          TezTaskAttemptID tezMapId =
+              IDConverter.fromMRTaskAttemptId(mapId);
+          mapIds.add(mapId);
+          MRTaskContext taskContext = 
+              new MRTaskContext(
+                  tezMapId, user, localConf.getJobName(),
+                  InitialTask.class.getName(), null, 
+                  info.getSplitIndex().getSplitLocation(), 
+                  info.getSplitIndex().getStartOffset(), 0);
+          Injector injector = Guice.createInjector(new InitialTask());
+
+          TezTaskOutput mapOutput = new TezLocalTaskOutputFiles();
+          mapOutput.setConf(localConf);
+          mapOutputFiles.put(mapId, mapOutput);
+
+          try {
+            map_tasks.getAndIncrement();
+            myMetrics.launchMap(mapId);
+            TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+            Task t = factory.createTask(taskContext);
+            t.initialize(localConf, Job.this);
+            t.run();
+            myMetrics.completeMap(mapId);
+          } finally {
+            map_tasks.getAndDecrement();
+          }
+
+          LOG.info("Finishing task: " + mapId);
+        } catch (Throwable e) {
+          this.storedException = e;
+        }
+      }
+    }
+
+    /**
+     * Create Runnables to encapsulate map tasks for use by the executor
+     * service.
+     * @param taskInfo Info about the map task splits
+     * @param jobId the job id
+     * @param mapOutputFiles a mapping from task attempts to output files
+     * @return a List of Runnables, one per map task.
+     */
+    protected List<MapTaskRunnable> getMapTaskRunnables(
+        TaskSplitMetaInfo [] taskInfo, JobID jobId,
+        Map<TaskAttemptID, TezTaskOutput> mapOutputFiles) {
+
+      int numTasks = 0;
+      ArrayList<MapTaskRunnable> list = new ArrayList<MapTaskRunnable>();
+      for (TaskSplitMetaInfo task : taskInfo) {
+        list.add(new MapTaskRunnable(task, numTasks++, jobId,
+            mapOutputFiles));
+      }
+
+      return list;
+    }
+
+    /**
+     * Initialize the counters that will hold partial-progress from
+     * the various task attempts.
+     * @param numMaps the number of map tasks in this job.
+     */
+    private synchronized void initCounters(int numMaps) {
+      // Initialize state trackers for all map tasks.
+      this.partialMapProgress = new float[numMaps];
+      this.mapCounters = new TezCounters[numMaps];
+      for (int i = 0; i < numMaps; i++) {
+        this.mapCounters[i] = EMPTY_COUNTERS;
+      }
+
+      this.reduceCounters = EMPTY_COUNTERS;
+    }
+
+    /**
+     * Creates the executor service used to run map tasks.
+     *
+     * @param numMapTasks the total number of map tasks to be run
+     * @return an ExecutorService instance that handles map tasks
+     */
+    protected ExecutorService createMapExecutor(int numMapTasks) {
+
+      // Determine the size of the thread pool to use
+      int maxMapThreads = job.getInt(LOCAL_MAX_MAPS, 1);
+      if (maxMapThreads < 1) {
+        throw new IllegalArgumentException(
+            "Configured " + LOCAL_MAX_MAPS + " must be >= 1");
+      }
+      this.numMapTasks = numMapTasks;
+      maxMapThreads = Math.min(maxMapThreads, this.numMapTasks);
+      maxMapThreads = Math.max(maxMapThreads, 1); // In case of no tasks.
+
+      initCounters(this.numMapTasks);
+
+      LOG.debug("Starting thread pool executor.");
+      LOG.debug("Max local threads: " + maxMapThreads);
+      LOG.debug("Map tasks to process: " + this.numMapTasks);
+
+      // Create a new executor service to drain the work queue.
+      ThreadFactory tf = new ThreadFactoryBuilder()
+        .setNameFormat("LocalJobRunner Map Task Executor #%d")
+        .build();
+      ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
+
+      return executor;
+    }
+
+    private org.apache.hadoop.mapreduce.OutputCommitter 
+    createOutputCommitter(boolean newApiCommitter, JobID jobId, Configuration conf) throws Exception {
+      org.apache.hadoop.mapreduce.OutputCommitter committer = null;
+
+      LOG.info("OutputCommitter set in config "
+          + conf.get("mapred.output.committer.class"));
+
+      if (newApiCommitter) {
+        org.apache.hadoop.mapreduce.TaskID taskId =
+            new org.apache.hadoop.mapreduce.TaskID(jobId, TaskType.MAP, 0);
+        org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID =
+            new org.apache.hadoop.mapreduce.TaskAttemptID(taskId, 0);
+        org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = 
+            new TaskAttemptContextImpl(conf, taskAttemptID);
+        @SuppressWarnings("rawtypes")
+        OutputFormat outputFormat =
+          ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), conf);
+        committer = outputFormat.getOutputCommitter(taskContext);
+      } else {
+        committer = ReflectionUtils.newInstance(conf.getClass(
+            "mapred.output.committer.class", FileOutputCommitter.class,
+            org.apache.hadoop.mapred.OutputCommitter.class), conf);
+      }
+      LOG.info("OutputCommitter is " + committer.getClass().getName());
+      return committer;
+    }
+
+    @Override
+    public void run() {
+      JobID jobId = profile.getJobID();
+      JobContext jContext = new JobContextImpl(job, jobId);
+      
+      org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;
+      try {
+        outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);
+      } catch (Exception e) {
+        LOG.info("Failed to createOutputCommitter", e);
+        return;
+      }
+      
+      try {
+        TaskSplitMetaInfo[] taskSplitMetaInfos = 
+          SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
+
+        int numReduceTasks = job.getNumReduceTasks();
+        if (numReduceTasks > 1 || numReduceTasks < 0) {
+          // we only allow 0 or 1 reducer in local mode
+          numReduceTasks = 1;
+          job.setNumReduceTasks(1);
+        }
+        outputCommitter.setupJob(jContext);
+        status.setSetupProgress(1.0f);
+
+        Map<TaskAttemptID, TezTaskOutput> mapOutputFiles =
+            Collections.synchronizedMap(new HashMap<TaskAttemptID, TezTaskOutput>());
+
+        List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
+            jobId, mapOutputFiles);
+        ExecutorService mapService = createMapExecutor(taskRunnables.size());
+
+        // Start populating the executor with work units.
+        // They may begin running immediately (in other threads).
+        for (Runnable r : taskRunnables) {
+          mapService.submit(r);
+        }
+
+        try {
+          mapService.shutdown(); // Instructs queue to drain.
+
+          // Wait for tasks to finish; do not use a time-based timeout.
+          // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
+          LOG.info("Waiting for map tasks");
+          mapService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException ie) {
+          // Cancel all threads.
+          mapService.shutdownNow();
+          throw ie;
+        }
+
+        LOG.info("Map task executor complete.");
+
+        // After waiting for the map tasks to complete, if any of these
+        // have thrown an exception, rethrow it now in the main thread context.
+        for (MapTaskRunnable r : taskRunnables) {
+          if (r.storedException != null) {
+            throw new Exception(r.storedException);
+          }
+        }
+
+        TaskAttemptID reduceId = new TaskAttemptID(new TaskID(
+            jobId, TaskType.REDUCE, 0), 0);
+        LOG.info("Starting task: " + reduceId);
+        try {
+          if (numReduceTasks > 0) {
+            String user = 
+                UserGroupInformation.getCurrentUser().getShortUserName();
+            JobConf localConf = new JobConf(job);
+            localConf.setUser(user);
+            localConf.set("mapreduce.jobtracker.address", "local");
+            localConf.setInt(TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, mapIds.size());
+            setupChildMapredLocalDirs(reduceId, user, localConf);
+            
+            MRTaskContext taskContext = 
+                new MRTaskContext(
+                    IDConverter.fromMRTaskAttemptId(reduceId), 
+                    user, localConf.getJobName(),
+                    LocalFinalTask.class.getName(), null, "", 0, mapIds.size()); 
+            Injector injector = Guice.createInjector(new LocalFinalTask());
+
+            // move map output to reduce input  
+            for (int i = 0; i < mapIds.size(); i++) {
+              if (!this.isInterrupted()) {
+                TaskAttemptID mapId = mapIds.get(i);
+                LOG.info("XXX mapId: " + i + 
+                    " LOCAL_DIR = " + 
+                    mapOutputFiles.get(mapId).getConf().get(
+                        TezJobConfig.LOCAL_DIR));
+                Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
+                TezTaskOutput localOutputFile = new TezLocalTaskOutputFiles();
+                localOutputFile.setConf(localConf);
+                Path reduceIn =
+                  localOutputFile.getInputFileForWrite(
+                      IDConverter.fromMRTaskId(mapId.getTaskID()),
+                        localFs.getFileStatus(mapOut).getLen());
+                if (!localFs.mkdirs(reduceIn.getParent())) {
+                  throw new IOException("Mkdirs failed to create "
+                      + reduceIn.getParent().toString());
+                }
+                if (!localFs.rename(mapOut, reduceIn))
+                  throw new IOException("Couldn't rename " + mapOut);
+              } else {
+                throw new InterruptedException();
+              }
+            }
+            if (!this.isInterrupted()) {
+              reduce_tasks += 1;
+              myMetrics.launchReduce(reduceId);
+              TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+              Task t = factory.createTask(taskContext);
+              t.initialize(localConf, Job.this);
+              t.run();
+              myMetrics.completeReduce(reduceId);
+              reduce_tasks -= 1;
+            } else {
+              throw new InterruptedException();
+            }
+          }
+        } finally {
+          for (TezTaskOutput output : mapOutputFiles.values()) {
+            output.removeAll();
+          }
+        }
+        // delete the temporary directory in output directory
+        // FIXME
+        //outputCommitter.commitJob(jContext);
+        status.setCleanupProgress(1.0f);
+
+        if (killed) {
+          this.status.setRunState(JobStatus.KILLED);
+        } else {
+          this.status.setRunState(JobStatus.SUCCEEDED);
+        }
+
+        JobEndNotifier.localRunnerNotification(job, status);
+
+      } catch (Throwable t) {
+        try {
+          outputCommitter.abortJob(jContext, 
+            org.apache.hadoop.mapreduce.JobStatus.State.FAILED);
+        } catch (IOException ioe) {
+          LOG.info("Error cleaning up job:" + id);
+        }
+        status.setCleanupProgress(1.0f);
+        if (killed) {
+          this.status.setRunState(JobStatus.KILLED);
+        } else {
+          this.status.setRunState(JobStatus.FAILED);
+        }
+        LOG.warn(id, t);
+
+        JobEndNotifier.localRunnerNotification(job, status);
+
+      } finally {
+        try {
+          fs.delete(systemJobFile.getParent(), true);  // delete submit dir
+          localFs.delete(localJobFile, true);              // delete local copy
+          // Cleanup distributed cache
+          localDistributedCacheManager.close();
+        } catch (IOException e) {
+          LOG.warn("Error cleaning up "+id+": "+e);
+        }
+      }
+    }
+
+    // TaskUmbilicalProtocol methods
+    @Override
+    public ContainerTask getTask(ContainerContext containerContext)
+        throws IOException {
+      return null;
+    }
+    
+    @Override
+    public synchronized boolean statusUpdate(TezTaskAttemptID taskId,
+        TezTaskStatus taskStatus) throws IOException, InterruptedException {
+      LOG.info(taskStatus.getStateString());
+      int taskIndex = mapIds.indexOf(taskId);
+      if (taskIndex >= 0) {                       // mapping
+        float numTasks = (float) this.numMapTasks;
+
+        partialMapProgress[taskIndex] = taskStatus.getProgress();
+        mapCounters[taskIndex] = taskStatus.getCounters();
+
+        float partialProgress = 0.0f;
+        for (float f : partialMapProgress) {
+          partialProgress += f;
+        }
+        status.setMapProgress(partialProgress / numTasks);
+      } else {
+        reduceCounters = taskStatus.getCounters();
+        status.setReduceProgress(taskStatus.getProgress());
+      }
+
+      // ignore phase
+      return true;
+    }
+
+    /** Return the current values of the counters for this job,
+     * including tasks that are in progress.
+     */
+    public synchronized TezCounters getCurrentCounters() {
+      if (null == mapCounters) {
+        // Counters not yet initialized for job.
+        return EMPTY_COUNTERS;
+      }
+
+      TezCounters current = EMPTY_COUNTERS;
+      for (TezCounters c : mapCounters) {
+        current.incrAllCounters(c);
+      }
+      current.incrAllCounters(reduceCounters);
+      return current;
+    }
+
+    /**
+     * Task is reporting that it is in commit_pending
+     * and it is waiting for the commit Response
+     */
+    @Override
+    public void commitPending(TezTaskAttemptID taskid,
+                              TezTaskStatus taskStatus) 
+    throws IOException, InterruptedException {
+      statusUpdate(taskid, taskStatus);
+    }
+
+    @Override
+    public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace) {
+      // Ignore for now
+    }
+    
+    @Override
+    public boolean ping(TezTaskAttemptID taskid) throws IOException {
+      return true;
+    }
+    
+    @Override
+    public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+      return true;
+    }
+    
+    @Override
+    public void done(TezTaskAttemptID taskId) throws IOException {
+      int taskIndex = mapIds.indexOf(taskId);
+      if (taskIndex >= 0) {                       // mapping
+        status.setMapProgress(1.0f);
+      } else {
+        status.setReduceProgress(1.0f);
+      }
+    }
+
+    @Override
+    public synchronized void fsError(TezTaskAttemptID taskId, String message) 
+    throws IOException {
+      LOG.fatal("FSError: "+ message + "from task: " + taskId);
+    }
+
+    @Override
+    public void shuffleError(TezTaskAttemptID taskId, String message) 
+        throws IOException {
+      LOG.fatal("shuffleError: "+ message + "from task: " + taskId);
+    }
+    
+    @Override
+    public synchronized void fatalError(TezTaskAttemptID taskId, String msg) 
+    throws IOException {
+      LOG.fatal("Fatal: "+ msg + "from task: " + taskId);
+    }
+    
+    @Override
+    public TezTaskDependencyCompletionEventsUpdate 
+    getDependentTasksCompletionEvents(
+        TezJobID jobID, int fromEventIdx, int maxEventsToFetch,
+        TezTaskAttemptID reduce) {
+      throw new UnsupportedOperationException(
+          "getDependentTasksCompletionEvents not supported in LocalJobRunner");
+    }
+
+    @Override
+    public void outputReady(TezTaskAttemptID taskAttemptId,
+        OutputContext outputContext) throws IOException {
+      // Ignore for now.
+    }
+
+    @Override
+    public ProceedToCompletionResponse proceedToCompletion(
+        TezTaskAttemptID taskAttemptId) throws IOException {
+      // TODO TEZAM5 Really depends on the module - inmem shuffle or not.
+      return new ProceedToCompletionResponse(true, true);
+    }
+  }
+
+  public LocalJobRunner(Configuration conf) throws IOException {
+    this(new JobConf(conf));
+  }
+
+  @Deprecated
+  public LocalJobRunner(JobConf conf) throws IOException {
+    this.fs = FileSystem.getLocal(conf);
+    this.conf = conf;
+    myMetrics = new LocalJobRunnerMetrics(new JobConf(conf));
+  }
+
+  // JobSubmissionProtocol methods
+
+  private static int jobid = 0;
+  public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID() {
+    return new org.apache.hadoop.mapreduce.JobID("local", ++jobid);
+  }
+
+  public org.apache.hadoop.mapreduce.JobStatus submitJob(
+      org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
+      Credentials credentials) throws IOException {
+    Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
+    job.job.setCredentials(credentials);
+    return job.status;
+
+  }
+
+  public void killJob(org.apache.hadoop.mapreduce.JobID id) {
+    jobs.get(JobID.downgrade(id)).killed = true;
+    jobs.get(JobID.downgrade(id)).interrupt();
+  }
+
+  public void setJobPriority(org.apache.hadoop.mapreduce.JobID id,
+      String jp) throws IOException {
+    throw new UnsupportedOperationException("Changing job priority " +
+                      "in LocalJobRunner is not supported.");
+  }
+  
+  /** Throws {@link UnsupportedOperationException} */
+  public boolean killTask(org.apache.hadoop.mapreduce.TaskAttemptID taskId,
+      boolean shouldFail) throws IOException {
+    throw new UnsupportedOperationException("Killing tasks in " +
+    "LocalJobRunner is not supported");
+  }
+
+  public org.apache.hadoop.mapreduce.TaskReport[] getTaskReports(
+      org.apache.hadoop.mapreduce.JobID id, TaskType type) {
+    return new org.apache.hadoop.mapreduce.TaskReport[0];
+  }
+
+  public org.apache.hadoop.mapreduce.JobStatus getJobStatus(
+      org.apache.hadoop.mapreduce.JobID id) {
+    Job job = jobs.get(JobID.downgrade(id));
+    if(job != null)
+      return job.status;
+    else 
+      return null;
+  }
+  
+  public org.apache.hadoop.mapreduce.Counters getJobCounters(
+      org.apache.hadoop.mapreduce.JobID id) {
+    Job job = jobs.get(JobID.downgrade(id));
+
+    return new org.apache.hadoop.mapreduce.Counters(
+        new MRCounters(job.getCurrentCounters()));
+  }
+
+  public String getFilesystemName() throws IOException {
+    return fs.getUri().toString();
+  }
+  
+  public ClusterMetrics getClusterMetrics() {
+    int numMapTasks = map_tasks.get();
+    return new ClusterMetrics(numMapTasks, reduce_tasks, numMapTasks,
+        reduce_tasks, 0, 0, 1, 1, jobs.size(), 1, 0, 0);
+  }
+
+  public JobTrackerStatus getJobTrackerStatus() {
+    return JobTrackerStatus.RUNNING;
+  }
+
+  public long getTaskTrackerExpiryInterval() 
+      throws IOException, InterruptedException {
+    return 0;
+  }
+
+  /** 
+   * Get all active trackers in cluster. 
+   * @return array of TaskTrackerInfo
+   */
+  public TaskTrackerInfo[] getActiveTrackers() 
+      throws IOException, InterruptedException {
+    return null;
+  }
+
+  /** 
+   * Get all blacklisted trackers in cluster. 
+   * @return array of TaskTrackerInfo
+   */
+  public TaskTrackerInfo[] getBlacklistedTrackers() 
+      throws IOException, InterruptedException {
+    return null;
+  }
+
+  public TaskCompletionEvent[] getTaskCompletionEvents(
+      org.apache.hadoop.mapreduce.JobID jobid
+      , int fromEventId, int maxEvents) throws IOException {
+    return TaskCompletionEvent.EMPTY_ARRAY;
+  }
+  
+  public org.apache.hadoop.mapreduce.JobStatus[] getAllJobs() {return null;}
+
+  
+  /**
+   * Returns the diagnostic information for a particular task in the given job.
+   * To be implemented
+   */
+  public String[] getTaskDiagnostics(
+      org.apache.hadoop.mapreduce.TaskAttemptID taskid) throws IOException{
+	  return new String [0];
+  }
+
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getSystemDir()
+   */
+  public String getSystemDir() {
+    Path sysDir = new Path(
+      conf.get(JTConfig.JT_SYSTEM_DIR, "/tmp/hadoop/mapred/system"));  
+    return fs.makeQualified(sysDir).toString();
+  }
+
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getQueueAdmins(String)
+   */
+  public AccessControlList getQueueAdmins(String queueName) throws IOException {
+	  return new AccessControlList(" ");// no queue admins for local job runner
+  }
+
+  /**
+   * @see org.apache.hadoop.mapreduce.protocol.ClientProtocol#getStagingAreaDir()
+   */
+  public String getStagingAreaDir() throws IOException {
+    Path stagingRootDir = new Path(conf.get(JTConfig.JT_STAGING_AREA_ROOT, 
+        "/tmp/hadoop/mapred/staging"));
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String user;
+    if (ugi != null) {
+      user = ugi.getShortUserName() + rand.nextInt();
+    } else {
+      user = "dummy" + rand.nextInt();
+    }
+    return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
+  }
+  
+  public String getJobHistoryDir() {
+    return null;
+  }
+
+  @Override
+  public QueueInfo[] getChildQueues(String queueName) throws IOException {
+    return null;
+  }
+
+  @Override
+  public QueueInfo[] getRootQueues() throws IOException {
+    return null;
+  }
+
+  @Override
+  public QueueInfo[] getQueues() throws IOException {
+    return null;
+  }
+
+
+  @Override
+  public QueueInfo getQueue(String queue) throws IOException {
+    return null;
+  }
+
+  @Override
+  public org.apache.hadoop.mapreduce.QueueAclsInfo[] 
+      getQueueAclsForCurrentUser() throws IOException{
+    return null;
+  }
+
+  /**
+   * Set the max number of map tasks to run concurrently in the LocalJobRunner.
+   * @param job the job to configure
+   * @param maxMaps the maximum number of map tasks to allow.
+   */
+  public static void setLocalMaxRunningMaps(
+      org.apache.hadoop.mapreduce.JobContext job,
+      int maxMaps) {
+    job.getConfiguration().setInt(LOCAL_MAX_MAPS, maxMaps);
+  }
+
+  /**
+   * @return the max number of map tasks to run concurrently in the
+   * LocalJobRunner.
+   */
+  public static int getLocalMaxRunningMaps(
+      org.apache.hadoop.mapreduce.JobContext job) {
+    return job.getConfiguration().getInt(LOCAL_MAX_MAPS, 1);
+  }
+
+  @Override
+  public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
+                                       ) throws IOException,
+                                                InterruptedException {
+  }
+
+  @Override
+  public Token<DelegationTokenIdentifier> 
+     getDelegationToken(Text renewer) throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override
+  public long renewDelegationToken(Token<DelegationTokenIdentifier> token
+                                      ) throws IOException,InterruptedException{
+    return 0;
+  }
+
+  @Override
+  public LogParams getLogFileParams(org.apache.hadoop.mapreduce.JobID jobID,
+      org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptID)
+      throws IOException, InterruptedException {
+    throw new UnsupportedOperationException("Not supported");
+  }
+  
+  static void setupChildMapredLocalDirs(
+      TaskAttemptID taskAttemptID, String user, JobConf conf) {
+    String[] localDirs = 
+        conf.getTrimmedStrings(
+            TezJobConfig.LOCAL_DIR, TezJobConfig.DEFAULT_LOCAL_DIR);
+    String jobId = taskAttemptID.getJobID().toString();
+    String taskId = taskAttemptID.getTaskID().toString();
+    boolean isCleanup = false;
+    StringBuffer childMapredLocalDir =
+        new StringBuffer(localDirs[0] + Path.SEPARATOR
+            + getLocalTaskDir(user, jobId, taskId, isCleanup));
+    for (int i = 1; i < localDirs.length; i++) {
+      childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+          + getLocalTaskDir(user, jobId, taskId, isCleanup));
+    }
+    LOG.info(TezJobConfig.LOCAL_DIR + " for child : " + taskAttemptID + 
+        " is " + childMapredLocalDir);
+    conf.set(TezJobConfig.LOCAL_DIR, childMapredLocalDir.toString());
+    conf.setClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER, 
+        TezLocalTaskOutputFiles.class, TezTaskOutput.class);
+  }
+  
+  static final String TASK_CLEANUP_SUFFIX = ".cleanup";
+  static final String SUBDIR = jobDir;
+  static final String JOBCACHE = "jobcache";
+  
+  static String getLocalTaskDir(String user, String jobid, String taskid,
+      boolean isCleanupAttempt) {
+    String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
+      + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
+    if (isCleanupAttempt) {
+      taskDir = taskDir + TASK_CLEANUP_SUFFIX;
+    }
+    return taskDir;
+  }
+  
+  
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerMetrics.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.metrics.jvm.JvmMetrics;
+
+@SuppressWarnings("deprecation")
+class LocalJobRunnerMetrics implements Updater {
+  private final MetricsRecord metricsRecord;
+
+  private int numMapTasksLaunched = 0;
+  private int numMapTasksCompleted = 0;
+  private int numReduceTasksLaunched = 0;
+  private int numReduceTasksCompleted = 0;
+  private int numWaitingMaps = 0;
+  private int numWaitingReduces = 0;
+  
+  public LocalJobRunnerMetrics(JobConf conf) {
+    String sessionId = conf.getSessionId();
+    // Initiate JVM Metrics
+    JvmMetrics.init("JobTracker", sessionId);
+    // Create a record for map-reduce metrics
+    MetricsContext context = MetricsUtil.getContext("mapred");
+    // record name is jobtracker for compatibility 
+    metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+    metricsRecord.setTag("sessionId", sessionId);
+    context.registerUpdater(this);
+  }
+    
+  /**
+   * Since this object is a registered updater, this method will be called
+   * periodically, e.g. every 5 seconds.
+   */
+  public void doUpdates(MetricsContext unused) {
+    synchronized (this) {
+      metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
+      metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+      metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
+      metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+      metricsRecord.incrMetric("waiting_maps", numWaitingMaps);
+      metricsRecord.incrMetric("waiting_reduces", numWaitingReduces);
+
+      numMapTasksLaunched = 0;
+      numMapTasksCompleted = 0;
+      numReduceTasksLaunched = 0;
+      numReduceTasksCompleted = 0;
+      numWaitingMaps = 0;
+      numWaitingReduces = 0;
+    }
+    metricsRecord.update();
+  }
+
+  public synchronized void launchMap(TaskAttemptID taskAttemptID) {
+    ++numMapTasksLaunched;
+    decWaitingMaps(taskAttemptID.getJobID(), 1);
+  }
+
+  public synchronized void completeMap(TaskAttemptID taskAttemptID) {
+    ++numMapTasksCompleted;
+  }
+
+  public synchronized void launchReduce(TaskAttemptID taskAttemptID) {
+    ++numReduceTasksLaunched;
+    decWaitingReduces(taskAttemptID.getJobID(), 1);
+  }
+
+  public synchronized void completeReduce(TaskAttemptID taskAttemptID) {
+    ++numReduceTasksCompleted;
+  }
+
+  private synchronized void decWaitingMaps(JobID id, int task) {
+    numWaitingMaps -= task;
+  }
+  
+  private synchronized void decWaitingReduces(JobID id, int task){
+    numWaitingReduces -= task;
+  }
+
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerContext.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,70 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.records.TezContainerId;
+
+// TODO EVENTUALLY move this over to PB. Fix package/module.
+// TODO EVENTUALLY unit tests for functionality.
+public class ContainerContext implements Writable {
+
+  ContainerId containerId;
+  String pid;
+
+  public ContainerContext() {
+    containerId = Records.newRecord(ContainerId.class);
+    pid = "";
+  }
+
+  public ContainerContext(ContainerId containerId, String pid) {
+    this.containerId = containerId;
+    this.pid = pid;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  public String getPid() {
+    return pid;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    TezContainerId tezContainerId = new TezContainerId();
+    tezContainerId.readFields(in);
+    this.containerId = tezContainerId.getContainerId();
+    this.pid = Text.readString(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    TezContainerId tezContainerId = new TezContainerId(containerId);
+    tezContainerId.write(out);
+    Text.writeString(out, pid);
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerTask.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerTask.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/ContainerTask.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+
+public class ContainerTask implements Writable {
+
+  MRTaskContext mrTaskContext;
+  boolean shouldDie;
+
+  public ContainerTask() {
+  }
+
+  public ContainerTask(MRTaskContext mrTaskContext, boolean shouldDie) {
+    this.mrTaskContext = mrTaskContext;
+    this.shouldDie = shouldDie;
+  }
+
+  public MRTaskContext getMrTaskContext() {
+    return mrTaskContext;
+  }
+
+  public boolean shouldDie() {
+    return shouldDie;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(shouldDie);
+    if (mrTaskContext != null) {
+      out.writeBoolean(true);
+      mrTaskContext.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    shouldDie = in.readBoolean();
+    boolean taskComing = in.readBoolean();
+    if (taskComing) {
+      mrTaskContext = new MRTaskContext();
+      mrTaskContext.readFields(in);
+    }
+  }
+
+  public String toString() {
+    return "shouldDie: " + shouldDie + ", mrTaskContext: " + mrTaskContext;
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,96 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+
+
+public class DeprecatedKeys {
+  static {
+    addDeprecatedKeys();
+  }
+
+  // TODO TEZAM4 Sometime, make sure this gets loaded by default. Insteaf of the current initialization in MRAppMaster, TezChild.
+  // Maybe define in an TEZConfiguration / TEZ JobConf variant.
+  
+  public static void init() {
+  }
+  
+  private static void addDeprecatedKeys() {
+    
+    _(MRConfig.MAPRED_IFILE_READAHEAD, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD);
+
+    _(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES);
+    
+    _(MRJobConfig.RECORDS_BEFORE_PROGRESS, TezJobConfig.RECORDS_BEFORE_PROGRESS);
+    
+    _(MRJobConfig.JOB_LOCAL_DIR, MRConfig.LOCAL_DIR);
+        
+    _(MRJobConfig.NUM_REDUCES, TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE);
+
+    _(MRJobConfig.NUM_MAPS, TezJobConfig.TEZ_ENGINE_TASK_INDEGREE);
+    
+    _(MRJobConfig.IO_SORT_FACTOR, TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR);
+    
+    _(MRJobConfig.MAP_SORT_SPILL_PERCENT, TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT);
+    
+    _(MRJobConfig.IO_SORT_MB, TezJobConfig.TEZ_ENGINE_IO_SORT_MB);
+    
+    _(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+    
+    _(MRJobConfig.MAP_COMBINE_MIN_SPILLS, TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS);
+    
+    _(MRJobConfig.COUNTERS_MAX_KEY, TezJobConfig.COUNTERS_MAX_KEY);
+    
+    _(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY, TezJobConfig.COUNTER_GROUP_NAME_MAX_KEY);
+    
+    _(MRJobConfig.COUNTER_NAME_MAX_KEY, TezJobConfig.COUNTER_NAME_MAX_KEY);
+    
+    _(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TezJobConfig.COUNTER_GROUPS_MAX_KEY);
+    
+    _(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+    
+    _(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES);
+    
+    _(MRJobConfig.SHUFFLE_NOTIFY_READERROR, TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
+    
+    _(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT);
+    
+    _(MRJobConfig.SHUFFLE_READ_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
+    
+    _(MRConfig.SHUFFLE_SSL_ENABLED_KEY, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
+    
+    _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
+    
+    _(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
+    
+    _(MRJobConfig.SHUFFLE_MERGE_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT);
+    
+    _(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS);
+    
+    _(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
+    
+    _(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
+  }
+  
+  private static void _(String oldKey, String newKey) {
+    Configuration.addDeprecation(oldKey, newKey);
+  }
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,87 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.tez.records.TezJobID;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.TezTaskID;
+
+public class IDConverter {
+
+  public static JobID toMRJobId(TezJobID jobId) {
+    return new JobID(jobId.getJtIdentifier(), jobId.getId());
+  }
+  
+  @SuppressWarnings("deprecation")
+  public static TaskID toMRTaskId(TezTaskID taskid) {
+    return new TaskID(
+        toMRJobId(taskid.getJobID()),        
+        taskid.getTaskType().equals(MRTaskType.MAP.toString()), 
+        taskid.getId());
+
+  }
+  
+  public static TaskAttemptID toMRTaskAttemptId(
+      TezTaskAttemptID taskAttemptId) {
+    return new TaskAttemptID(
+        toMRTaskId(taskAttemptId.getTaskID()),
+        taskAttemptId.getId());
+  }
+  
+  public static TezJobID fromMRJobId(org.apache.hadoop.mapreduce.JobID jobId) {
+    return new TezJobID(jobId.getJtIdentifier(), jobId.getId());
+  }
+
+  public static MRTaskType fromMRTaskType(TaskType type) {
+    switch (type) {
+      case REDUCE:
+        return MRTaskType.REDUCE;
+      case JOB_SETUP:
+        return MRTaskType.JOB_SETUP;
+      case JOB_CLEANUP:
+        return MRTaskType.JOB_CLEANUP;
+      case TASK_CLEANUP:
+        return MRTaskType.TASK_CLEANUP;
+      case MAP:
+        return MRTaskType.MAP;
+      default:
+        throw new RuntimeException("Unknown TaskType: " + type);
+    }
+  }
+
+  public static TezTaskID
+      fromMRTaskId(org.apache.hadoop.mapreduce.TaskID taskid) {
+    return new TezTaskID(
+        fromMRJobId(taskid.getJobID()),
+        fromMRTaskType(taskid.getTaskType()).toString(),
+        taskid.getId());
+  }
+
+  public static TezTaskAttemptID fromMRTaskAttemptId(
+      org.apache.hadoop.mapreduce.TaskAttemptID taskAttemptId) {
+    return new TezTaskAttemptID(
+        fromMRTaskId(taskAttemptId.getTaskID()),
+        taskAttemptId.getId());
+  }
+  
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRConfig.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Place holder for cluster level configuration keys.
+ * 
+ * The keys should have "mapreduce.cluster." as the prefix. 
+ *
+ */
+@InterfaceAudience.Private
+public interface MRConfig {
+
+  // Cluster-level configuration parameters
+  public static final String TEMP_DIR = "mapreduce.cluster.temp.dir";
+  public static final String LOCAL_DIR = "mapreduce.cluster.local.dir";
+  public static final String MAPMEMORY_MB = "mapreduce.cluster.mapmemory.mb";
+  public static final String REDUCEMEMORY_MB = 
+    "mapreduce.cluster.reducememory.mb";
+  public static final String MR_ACLS_ENABLED = "mapreduce.cluster.acls.enabled";
+  public static final String MR_ADMINS =
+    "mapreduce.cluster.administrators";
+  @Deprecated
+  public static final String MR_SUPERGROUP =
+    "mapreduce.cluster.permissions.supergroup";
+
+  //Delegation token related keys
+  public static final String  DELEGATION_KEY_UPDATE_INTERVAL_KEY = 
+    "mapreduce.cluster.delegation.key.update-interval";
+  public static final long    DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT = 
+    24*60*60*1000; // 1 day
+  public static final String  DELEGATION_TOKEN_RENEW_INTERVAL_KEY = 
+    "mapreduce.cluster.delegation.token.renew-interval";
+  public static final long    DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT = 
+    24*60*60*1000;  // 1 day
+  public static final String  DELEGATION_TOKEN_MAX_LIFETIME_KEY = 
+    "mapreduce.cluster.delegation.token.max-lifetime";
+  public static final long    DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT = 
+    7*24*60*60*1000; // 7 days
+  
+  public static final String RESOURCE_CALCULATOR_PROCESS_TREE =
+    "mapreduce.job.process-tree.class";
+  public static final String STATIC_RESOLUTIONS = 
+    "mapreduce.job.net.static.resolutions";
+
+  public static final String MASTER_ADDRESS  = "mapreduce.jobtracker.address";
+  public static final String MASTER_USER_NAME = 
+    "mapreduce.jobtracker.kerberos.principal";
+
+  public static final String FRAMEWORK_NAME  = "mapreduce.framework.name";
+  public static final String CLASSIC_FRAMEWORK_NAME  = "classic";
+  public static final String YARN_TEZ_FRAMEWORK_NAME  = "yarn-tez";
+  public static final String LOCAL_FRAMEWORK_NAME = "local";
+
+  public static final String TASK_LOCAL_OUTPUT_CLASS =
+  "mapreduce.task.local.output.class";
+
+  public static final String PROGRESS_STATUS_LEN_LIMIT_KEY =
+    "mapreduce.task.max.status.length";
+  public static final int PROGRESS_STATUS_LEN_LIMIT_DEFAULT = 512;
+
+  public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10;
+  public static final String MAX_BLOCK_LOCATIONS_KEY =
+    "mapreduce.job.max.split.locations";
+
+  public static final String SHUFFLE_SSL_ENABLED_KEY =
+    "mapreduce.shuffle.ssl.enabled";
+
+  public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
+
+  /**
+   * Configuration key to enable/disable IFile readahead.
+   */
+  public static final String MAPRED_IFILE_READAHEAD =
+    "mapreduce.ifile.readahead";
+
+  public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true;
+
+  /**
+   * Configuration key to set the IFile readahead length in bytes.
+   */
+  public static final String MAPRED_IFILE_READAHEAD_BYTES =
+    "mapreduce.ifile.readahead.bytes";
+
+  public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES =
+    4 * 1024 * 1024;
+}

Added: incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java (added)
+++ incubator/tez/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface MRJobConfig {
+
+  // Put all of the attribute names in here so that Job and JobContext are
+  // consistent.
+  public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.job.inputformat.class";
+
+  public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";
+
+  public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class";
+
+  public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";
+
+  public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.job.outputformat.class";
+
+  public static final String PARTITIONER_CLASS_ATTR = "mapreduce.job.partitioner.class";
+
+  public static final String SETUP_CLEANUP_NEEDED = "mapreduce.job.committer.setup.cleanup.needed";
+
+  public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";
+
+  public static final String JAR = "mapreduce.job.jar";
+
+  public static final String ID = "mapreduce.job.id";
+
+  public static final String JOB_NAME = "mapreduce.job.name";
+
+  public static final String JAR_UNPACK_PATTERN = "mapreduce.job.jar.unpack.pattern";
+
+  public static final String USER_NAME = "mapreduce.job.user.name";
+
+  public static final String PRIORITY = "mapreduce.job.priority";
+
+  public static final String QUEUE_NAME = "mapreduce.job.queuename";
+
+  public static final String JVM_NUMTASKS_TORUN = "mapreduce.job.jvm.numtasks";
+
+  public static final String SPLIT_FILE = "mapreduce.job.splitfile";
+
+  public static final String NUM_MAPS = "mapreduce.job.maps";
+
+  public static final String MAX_TASK_FAILURES_PER_TRACKER = "mapreduce.job.maxtaskfailures.per.tracker";
+
+  public static final String COMPLETED_MAPS_FOR_REDUCE_SLOWSTART = "mapreduce.job.reduce.slowstart.completedmaps";
+
+  public static final String NUM_REDUCES = "mapreduce.job.reduces";
+
+  public static final String SKIP_RECORDS = "mapreduce.job.skiprecords";
+
+  public static final String SKIP_OUTDIR = "mapreduce.job.skip.outdir";
+
+  public static final String SPECULATIVE_SLOWNODE_THRESHOLD = "mapreduce.job.speculative.slownodethreshold";
+
+  public static final String SPECULATIVE_SLOWTASK_THRESHOLD = "mapreduce.job.speculative.slowtaskthreshold";
+
+  public static final String SPECULATIVECAP = "mapreduce.job.speculative.speculativecap";
+
+  public static final String JOB_LOCAL_DIR = "mapreduce.job.local.dir";
+
+  public static final String OUTPUT_KEY_CLASS = "mapreduce.job.output.key.class";
+
+  public static final String OUTPUT_VALUE_CLASS = "mapreduce.job.output.value.class";
+
+  public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";
+
+  public static final String GROUP_COMPARATOR_CLASS = "mapreduce.job.output.group.comparator.class";
+
+  public static final String WORKING_DIR = "mapreduce.job.working.dir";
+
+  public static final String CLASSPATH_ARCHIVES = "mapreduce.job.classpath.archives";
+
+  public static final String CLASSPATH_FILES = "mapreduce.job.classpath.files";
+
+  public static final String CACHE_FILES = "mapreduce.job.cache.files";
+
+  public static final String CACHE_ARCHIVES = "mapreduce.job.cache.archives";
+
+  public static final String CACHE_FILES_SIZES = "mapreduce.job.cache.files.filesizes"; // internal use only
+
+  public static final String CACHE_ARCHIVES_SIZES = "mapreduce.job.cache.archives.filesizes"; // ditto
+
+  public static final String CACHE_LOCALFILES = "mapreduce.job.cache.local.files";
+
+  public static final String CACHE_LOCALARCHIVES = "mapreduce.job.cache.local.archives";
+
+  public static final String CACHE_FILE_TIMESTAMPS = "mapreduce.job.cache.files.timestamps";
+
+  public static final String CACHE_ARCHIVES_TIMESTAMPS = "mapreduce.job.cache.archives.timestamps";
+
+  public static final String CACHE_FILE_VISIBILITIES = "mapreduce.job.cache.files.visibilities";
+
+  public static final String CACHE_ARCHIVES_VISIBILITIES = "mapreduce.job.cache.archives.visibilities";
+
+  /**
+   * @deprecated Symlinks are always on and cannot be disabled.
+   */
+  @Deprecated
+  public static final String CACHE_SYMLINK = "mapreduce.job.cache.symlink.create";
+
+  public static final String USER_LOG_RETAIN_HOURS = "mapreduce.job.userlog.retain.hours";
+
+  public static final String MAPREDUCE_JOB_USER_CLASSPATH_FIRST = "mapreduce.job.user.classpath.first";
+
+  public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor";
+
+  public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";
+
+  public static final String INDEX_CACHE_MEMORY_LIMIT = "mapreduce.task.index.cache.limit.bytes";
+
+  public static final String PRESERVE_FAILED_TASK_FILES = "mapreduce.task.files.preserve.failedtasks";
+
+  public static final String PRESERVE_FILES_PATTERN = "mapreduce.task.files.preserve.filepattern";
+
+  public static final String TASK_TEMP_DIR = "mapreduce.task.tmp.dir";
+
+  public static final String TASK_DEBUGOUT_LINES = "mapreduce.task.debugout.lines";
+
+  public static final String RECORDS_BEFORE_PROGRESS = "mapreduce.task.merge.progress.records";
+
+  public static final String SKIP_START_ATTEMPTS = "mapreduce.task.skip.start.attempts";
+
+  public static final String TASK_ATTEMPT_ID = "mapreduce.task.attempt.id";
+
+  public static final String TASK_ISMAP = "mapreduce.task.ismap";
+
+  public static final String TASK_PARTITION = "mapreduce.task.partition";
+
+  public static final String TASK_PROFILE = "mapreduce.task.profile";
+
+  public static final String TASK_PROFILE_PARAMS = "mapreduce.task.profile.params";
+
+  public static final String NUM_MAP_PROFILES = "mapreduce.task.profile.maps";
+
+  public static final String NUM_REDUCE_PROFILES = "mapreduce.task.profile.reduces";
+
+  public static final String TASK_MAP_PROFILE_PARAMS = "mapreduce.task.profile.map.params";
+  
+  public static final String TASK_REDUCE_PROFILE_PARAMS = "mapreduce.task.profile.reduce.params";
+  
+  public static final String TASK_TIMEOUT = "mapreduce.task.timeout";
+
+  public static final String TASK_TIMEOUT_CHECK_INTERVAL_MS = "mapreduce.task.timeout.check-interval-ms";
+  
+  public static final String TASK_ID = "mapreduce.task.id";
+
+  public static final String TASK_OUTPUT_DIR = "mapreduce.task.output.dir";
+
+  public static final String TASK_USERLOG_LIMIT = "mapreduce.task.userlog.limit.kb";
+
+  public static final String MAP_SORT_SPILL_PERCENT = "mapreduce.map.sort.spill.percent";
+
+  public static final String MAP_INPUT_FILE = "mapreduce.map.input.file";
+
+  public static final String MAP_INPUT_PATH = "mapreduce.map.input.length";
+
+  public static final String MAP_INPUT_START = "mapreduce.map.input.start";
+
+  public static final String MAP_MEMORY_MB = "mapreduce.map.memory.mb";
+  public static final int DEFAULT_MAP_MEMORY_MB = 1024;
+
+  public static final String MAP_CPU_VCORES = "mapreduce.map.cpu.vcores";
+  public static final int DEFAULT_MAP_CPU_VCORES = 1;
+
+  public static final String MAP_MEMORY_PHYSICAL_MB = "mapreduce.map.memory.physical.mb";
+
+  public static final String MAP_ENV = "mapreduce.map.env";
+
+  public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts";
+
+  public static final String MAP_MAX_ATTEMPTS = "mapreduce.map.maxattempts";
+
+  public static final String MAP_DEBUG_SCRIPT = "mapreduce.map.debug.script";
+
+  public static final String MAP_SPECULATIVE = "mapreduce.map.speculative";
+
+  public static final String MAP_FAILURES_MAX_PERCENT = "mapreduce.map.failures.maxpercent";
+
+  public static final String MAP_SKIP_INCR_PROC_COUNT = "mapreduce.map.skip.proc-count.auto-incr";
+
+  public static final String MAP_SKIP_MAX_RECORDS = "mapreduce.map.skip.maxrecords";
+
+  public static final String MAP_COMBINE_MIN_SPILLS = "mapreduce.map.combine.minspills";
+
+  public static final String MAP_OUTPUT_COMPRESS = "mapreduce.map.output.compress";
+
+  public static final String MAP_OUTPUT_COMPRESS_CODEC = "mapreduce.map.output.compress.codec";
+
+  public static final String MAP_OUTPUT_KEY_CLASS = "mapreduce.map.output.key.class";
+
+  public static final String MAP_OUTPUT_VALUE_CLASS = "mapreduce.map.output.value.class";
+
+  public static final String MAP_OUTPUT_KEY_FIELD_SEPERATOR = "mapreduce.map.output.key.field.separator";
+
+  public static final String MAP_LOG_LEVEL = "mapreduce.map.log.level";
+
+  public static final String REDUCE_LOG_LEVEL = "mapreduce.reduce.log.level";
+
+  public static final String DEFAULT_LOG_LEVEL = "INFO";
+
+  public static final String REDUCE_MERGE_INMEM_THRESHOLD = "mapreduce.reduce.merge.inmem.threshold";
+
+  public static final String REDUCE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.input.buffer.percent";
+
+  public static final String REDUCE_MARKRESET_BUFFER_PERCENT = "mapreduce.reduce.markreset.buffer.percent";
+
+  public static final String REDUCE_MARKRESET_BUFFER_SIZE = "mapreduce.reduce.markreset.buffer.size";
+
+  public static final String REDUCE_MEMORY_PHYSICAL_MB = "mapreduce.reduce.memory.physical.mb";
+
+  public static final String REDUCE_MEMORY_MB = "mapreduce.reduce.memory.mb";
+  public static final int DEFAULT_REDUCE_MEMORY_MB = 1024;
+
+  public static final String REDUCE_CPU_VCORES = "mapreduce.reduce.cpu.vcores";
+  public static final int DEFAULT_REDUCE_CPU_VCORES = 1;
+
+  public static final String REDUCE_MEMORY_TOTAL_BYTES = "mapreduce.reduce.memory.totalbytes";
+
+  public static final String SHUFFLE_INPUT_BUFFER_PERCENT = "mapreduce.reduce.shuffle.input.buffer.percent";
+
+  public static final String SHUFFLE_MEMORY_LIMIT_PERCENT
+    = "mapreduce.reduce.shuffle.memory.limit.percent";
+
+  public static final String SHUFFLE_MERGE_PERCENT = "mapreduce.reduce.shuffle.merge.percent";
+
+  public static final String REDUCE_FAILURES_MAXPERCENT = "mapreduce.reduce.failures.maxpercent";
+
+  public static final String REDUCE_ENV = "mapreduce.reduce.env";
+
+  public static final String REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
+
+  public static final String MAPREDUCE_JOB_DIR = "mapreduce.job.dir";
+
+  public static final String REDUCE_MAX_ATTEMPTS = "mapreduce.reduce.maxattempts";
+
+  public static final String SHUFFLE_PARALLEL_COPIES = "mapreduce.reduce.shuffle.parallelcopies";
+
+  public static final String REDUCE_DEBUG_SCRIPT = "mapreduce.reduce.debug.script";
+
+  public static final String REDUCE_SPECULATIVE = "mapreduce.reduce.speculative";
+
+  public static final String SHUFFLE_CONNECT_TIMEOUT = "mapreduce.reduce.shuffle.connect.timeout";
+
+  public static final String SHUFFLE_READ_TIMEOUT = "mapreduce.reduce.shuffle.read.timeout";
+
+  public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
+
+  public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
+
+  public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
+
+  public static final String REDUCE_SKIP_MAXGROUPS = "mapreduce.reduce.skip.maxgroups";
+
+  public static final String REDUCE_MEMTOMEM_THRESHOLD = "mapreduce.reduce.merge.memtomem.threshold";
+
+  public static final String REDUCE_MEMTOMEM_ENABLED = "mapreduce.reduce.merge.memtomem.enabled";
+
+  public static final String COMBINE_RECORDS_BEFORE_PROGRESS = "mapreduce.task.combine.progress.records";
+
+  public static final String JOB_NAMENODES = "mapreduce.job.hdfs-servers";
+
+  public static final String JOB_JOBTRACKER_ID = "mapreduce.job.kerberos.jtprinicipal";
+
+  public static final String JOB_CANCEL_DELEGATION_TOKEN = "mapreduce.job.complete.cancel.delegation.tokens";
+
+  public static final String JOB_ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
+
+  public static final String DEFAULT_JOB_ACL_VIEW_JOB = " ";
+
+  public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
+
+  public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
+  
+  /* config for tracking the local file where all the credentials for the job
+   * credentials.
+   */
+  public static final String MAPREDUCE_JOB_CREDENTIALS_BINARY = 
+      "mapreduce.job.credentials.binary";
+
+  public static final String JOB_SUBMITHOST =
+    "mapreduce.job.submithostname";
+  public static final String JOB_SUBMITHOSTADDR =
+    "mapreduce.job.submithostaddress";
+
+  public static final String COUNTERS_MAX_KEY = "mapreduce.job.counters.max";
+  public static final int COUNTERS_MAX_DEFAULT = 120;
+
+  public static final String COUNTER_GROUP_NAME_MAX_KEY = "mapreduce.job.counters.group.name.max";
+  public static final int COUNTER_GROUP_NAME_MAX_DEFAULT = 128;
+
+  public static final String COUNTER_NAME_MAX_KEY = "mapreduce.job.counters.counter.name.max";
+  public static final int COUNTER_NAME_MAX_DEFAULT = 64;
+
+  public static final String COUNTER_GROUPS_MAX_KEY = "mapreduce.job.counters.groups.max";
+  public static final int COUNTER_GROUPS_MAX_DEFAULT = 50;
+  public static final String JOB_UBERTASK_ENABLE =
+    "mapreduce.job.ubertask.enable";
+  public static final String JOB_UBERTASK_MAXMAPS =
+    "mapreduce.job.ubertask.maxmaps";
+  public static final String JOB_UBERTASK_MAXREDUCES =
+    "mapreduce.job.ubertask.maxreduces";
+  public static final String JOB_UBERTASK_MAXBYTES =
+    "mapreduce.job.ubertask.maxbytes";
+
+  public static final String MR_PREFIX = "yarn.app.mapreduce.";
+
+  public static final String MR_AM_PREFIX = MR_PREFIX + "am.";
+
+  /** The number of client retires to the AM - before reconnecting to the RM
+   * to fetch Application State. 
+   */
+  public static final String MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 
+    MR_PREFIX + "client-am.ipc.max-retries";
+  public static final int DEFAULT_MR_CLIENT_TO_AM_IPC_MAX_RETRIES = 3;
+  
+  /**
+   * The number of client retries to the RM/HS/AM before throwing exception.
+   */
+  public static final String MR_CLIENT_MAX_RETRIES = 
+    MR_PREFIX + "client.max-retries";
+  public static final int DEFAULT_MR_CLIENT_MAX_RETRIES = 3;
+  
+  /** The staging directory for map reduce.*/
+  public static final String MR_AM_STAGING_DIR = 
+    MR_AM_PREFIX+"staging-dir";
+  public static final String DEFAULT_MR_AM_STAGING_DIR = 
+    "/tmp/hadoop-yarn/staging";
+
+  /** The amount of memory the MR app master needs.*/
+  public static final String MR_AM_VMEM_MB =
+    MR_AM_PREFIX+"resource.mb";
+  public static final int DEFAULT_MR_AM_VMEM_MB = 1536;
+
+  /** The number of virtual cores the MR app master needs.*/
+  public static final String MR_AM_CPU_VCORES =
+    MR_AM_PREFIX+"resource.cpu-vcores";
+  public static final int DEFAULT_MR_AM_CPU_VCORES = 1;
+
+  /** Command line arguments passed to the MR app master.*/
+  public static final String MR_AM_COMMAND_OPTS =
+    MR_AM_PREFIX+"command-opts";
+  public static final String DEFAULT_MR_AM_COMMAND_OPTS = "-Xmx1024m";
+
+  /** Admin command opts passed to the MR app master.*/
+  public static final String MR_AM_ADMIN_COMMAND_OPTS =
+      MR_AM_PREFIX+"admin-command-opts";
+  public static final String DEFAULT_MR_AM_ADMIN_COMMAND_OPTS = "";
+
+  /** Root Logging level passed to the MR app master.*/
+  public static final String MR_AM_LOG_LEVEL = 
+    MR_AM_PREFIX+"log.level";
+  public static final String DEFAULT_MR_AM_LOG_LEVEL = "INFO";
+
+  /**The number of splits when reporting progress in MR*/
+  public static final String MR_AM_NUM_PROGRESS_SPLITS = 
+    MR_AM_PREFIX+"num-progress-splits";
+  public static final int DEFAULT_MR_AM_NUM_PROGRESS_SPLITS = 12;
+
+  /**
+   * Upper limit on the number of threads user to launch containers in the app
+   * master. Expect level config, you shouldn't be needing it in most cases.
+   */
+  public static final String MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT =
+    MR_AM_PREFIX+"containerlauncher.thread-count-limit";
+
+  public static final int DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT = 
+      500;
+
+  /** Number of threads to handle job client RPC requests.*/
+  public static final String MR_AM_JOB_CLIENT_THREAD_COUNT =
+    MR_AM_PREFIX + "job.client.thread-count";
+  public static final int DEFAULT_MR_AM_JOB_CLIENT_THREAD_COUNT = 1;
+
+  /** 
+   * Range of ports that the MapReduce AM can use when binding. Leave blank
+   * if you want all possible ports.
+   */
+  public static final String MR_AM_JOB_CLIENT_PORT_RANGE = 
+    MR_AM_PREFIX + "job.client.port-range";
+  
+  /** Enable blacklisting of nodes in the job.*/
+  public static final String MR_AM_JOB_NODE_BLACKLISTING_ENABLE = 
+    MR_AM_PREFIX  + "job.node-blacklisting.enable";
+
+  /** Ignore blacklisting if a certain percentage of nodes have been blacklisted */
+  public static final String MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT =
+      MR_AM_PREFIX + "job.node-blacklisting.ignore-threshold-node-percent";
+  public static final int DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT =
+      33;
+  
+  /** Enable job recovery.*/
+  public static final String MR_AM_JOB_RECOVERY_ENABLE = 
+    MR_AM_PREFIX + "job.recovery.enable";
+
+  /** 
+   * Limit on the number of reducers that can be preempted to ensure that at
+   *  least one map task can run if it needs to. Percentage between 0.0 and 1.0
+   */
+  public static final String MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 
+    MR_AM_PREFIX  + "job.reduce.preemption.limit";
+  public static final float DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT = 0.5f;
+  
+  /** AM ACL disabled. **/
+  public static final String JOB_AM_ACCESS_DISABLED = 
+    "mapreduce.job.am-access-disabled";
+  public static final boolean DEFAULT_JOB_AM_ACCESS_DISABLED = false;
+
+  /**
+   * Limit reduces starting until a certain percentage of maps have finished.
+   *  Percentage between 0.0 and 1.0
+   */
+  public static final String MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT = 
+    MR_AM_PREFIX  + "job.reduce.rampup.limit";
+  public static final float DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT = 0.5f;
+
+  /** The class that should be used for speculative execution calculations.*/
+  public static final String MR_AM_JOB_SPECULATOR =
+    MR_AM_PREFIX + "job.speculator.class";
+
+  /** Class used to estimate task resource needs.*/
+  public static final String MR_AM_TASK_ESTIMATOR =
+    MR_AM_PREFIX + "job.task.estimator.class";
+
+  /** The lambda value in the smoothing function of the task estimator.*/
+  public static final String MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS =
+    MR_AM_PREFIX
+    + "job.task.estimator.exponential.smooth.lambda-ms";
+
+  public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS = 
+  1000L * 60;
+
+  /** true if the smoothing rate should be exponential.*/
+  public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE =
+    MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate";
+
+  /** The number of threads used to handle task RPC calls.*/
+  public static final String MR_AM_TASK_LISTENER_THREAD_COUNT =
+    MR_AM_PREFIX + "job.task.listener.thread-count";
+  public static final int DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT = 30;
+
+  /** How often the AM should schedule assigning tasks with allocated
+   * containers.*/
+  public static final String MR_AM_SCHEDULER_INTERVAL =
+    MR_AM_PREFIX + "scheduler.interval-ms";
+  public static final long DEFAULT_MR_AM_SCHEDULER_INTERVAL = 1000l;
+
+  /** Whether the AM should attempt to reuse containers across task attempts */
+  public static final String MR_AM_SCHEDULER_REUSE_ENABLE =
+      MR_AM_PREFIX + "scheduler.reuse.enable";
+  public static final boolean DEFAULT_MR_AM_SCHEDULER_REUSE_ENABLE = false;
+  
+  /**
+   * The maximum number of attempts that should run in a single jvm. -1
+   * indicates no limit.
+   */
+  public static final String MR_AM_SCHEDULER_REUSE_MAX_ATTEMPTS_PER_CONTAINER =
+      MR_AM_PREFIX + "scheduler.reuse.max-attempts-per-container";
+  public static final int DEFAULT_MR_AM_SCHEDULER_REUSE_MAX_ATTEMPTS_PER_CONTAINER = -1;
+  
+  /** How often the AM should send heartbeats to the RM.*/
+  public static final String MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS =
+    MR_AM_PREFIX + "scheduler.heartbeat.interval-ms";
+  public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 1000;
+
+  /**
+   * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+   * milliseconds before aborting. During this interval, AM will still try
+   * to contact the RM.
+   */
+  public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS =
+    MR_AM_PREFIX + "scheduler.connection.wait.interval-ms";
+  public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
+
+  /**
+   * Boolean. Create the base dirs in the JobHistoryEventHandler
+   * Set to false for multi-user clusters.  This is an internal config that
+   * is set by the MR framework and read by it too.
+   */
+  public static final String MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR = 
+    MR_AM_PREFIX + "create-intermediate-jh-base-dir";
+  
+  public static final String MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
+      MR_AM_PREFIX + "history.max-unflushed-events";
+  public static final int DEFAULT_MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS =
+      200;
+
+  public static final String MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
+      MR_AM_PREFIX + "history.job-complete-unflushed-multiplier";
+  public static final int DEFAULT_MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER =
+      30;
+
+  public static final String MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
+      MR_AM_PREFIX + "history.complete-event-flush-timeout";
+  public static final long DEFAULT_MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS =
+      30 * 1000l;
+
+  public static final String MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
+      MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
+  public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
+      50;
+  
+  public static final String MR_AM_ENV =
+      MR_AM_PREFIX + "env";
+  
+  public static final String MR_AM_ADMIN_USER_ENV =
+      MR_AM_PREFIX + "admin.user.env";
+
+  public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
+      "mapreduce.admin.map.child.java.opts";
+
+  public static final String MAPRED_REDUCE_ADMIN_JAVA_OPTS =
+      "mapreduce.admin.reduce.child.java.opts";
+
+  public static final String DEFAULT_MAPRED_ADMIN_JAVA_OPTS =
+      "-Djava.net.preferIPv4Stack=true " +
+          "-Dhadoop.metrics.log.level=WARN ";
+
+  public static final String MAPRED_ADMIN_USER_SHELL =
+      "mapreduce.admin.user.shell";
+
+  public static final String DEFAULT_SHELL = "/bin/bash";
+
+  public static final String MAPRED_ADMIN_USER_ENV =
+      "mapreduce.admin.user.env";
+
+  public static final String DEFAULT_MAPRED_ADMIN_USER_ENV =
+      "LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native";
+
+  public static final String WORKDIR = "work";
+
+  public static final String OUTPUT = "output";
+
+  public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
+
+  // Environment variables used by Pipes. (TODO: these
+  // do not appear to be used by current pipes source code!)
+  public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
+  public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
+
+  public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
+
+  // This should be the directory where splits file gets localized on the node
+  // running ApplicationMaster.
+  public static final String JOB_SUBMIT_DIR = "jobSubmitDir";
+
+  // This should be the name of the localized job-configuration file on the node
+  // running ApplicationMaster and Task
+  public static final String JOB_CONF_FILE = "job.xml";
+
+  // This should be the name of the localized job-jar file on the node running
+  // individual containers/tasks.
+  public static final String JOB_JAR = "job.jar";
+
+  public static final String JOB_SPLIT = "job.split";
+
+  public static final String JOB_SPLIT_METAINFO = "job.splitmetainfo";
+
+  public static final String APPLICATION_MASTER_CLASS =
+      "org.apache.hadoop.mapreduce.v2.app2.MRAppMaster";
+
+  // The token file for the application. Should contain tokens for access to
+  // remote file system and may optionally contain application specific tokens.
+  // For now, generated by the AppManagers and used by NodeManagers and the
+  // Containers.
+  public static final String APPLICATION_TOKENS_FILE = "appTokens";
+  
+  /** The log directory for the containers */
+  public static final String TASK_LOG_DIR = MR_PREFIX + "container.log.dir";
+  
+  public static final String TASK_LOG_SIZE = MR_PREFIX + "container.log.filesize";
+  
+  public static final String MAPREDUCE_V2_CHILD_CLASS = 
+      "org.apache.hadoop.mapred.YarnChild";
+
+  public static final String APPLICATION_ATTEMPT_ID =
+      "mapreduce.job.application.attempt.id";
+
+  /**
+   * Job end notification.
+   */
+  public static final String MR_JOB_END_NOTIFICATION_URL =
+    "mapreduce.job.end-notification.url";
+
+  public static final String MR_JOB_END_NOTIFICATION_PROXY =
+    "mapreduce.job.end-notification.proxy";
+
+  public static final String MR_JOB_END_RETRY_ATTEMPTS =
+    "mapreduce.job.end-notification.retry.attempts";
+
+  public static final String MR_JOB_END_RETRY_INTERVAL =
+    "mapreduce.job.end-notification.retry.interval";
+
+  public static final String MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS =
+    "mapreduce.job.end-notification.max.attempts";
+
+  public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
+    "mapreduce.job.end-notification.max.retry.interval";
+
+  /*
+   * MR AM Service Authorization
+   */
+  public static final String   
+  MR_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL =
+      "security.job.task.protocol.acl";
+  public static final String   
+  MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT =
+      "security.job.client.protocol.acl";
+
+  /**
+   * CLASSPATH for all YARN MapReduce applications.
+   */
+  public static final String MAPREDUCE_APPLICATION_CLASSPATH = 
+      "mapreduce.application.classpath";
+
+  /**
+   * Default CLASSPATH for all YARN MapReduce applications.
+   */
+  public static final String[] DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH = {
+      "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*",
+      "$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*",
+  };
+
+
+  // TODO Fix this. Not accessible in JobClient
+  /* do we need a HS delegation token for this client */
+  @InterfaceAudience.Private
+  static final String HS_DELEGATION_TOKEN_REQUIRED
+      = "mapreduce.history.server.delegationtoken.required";
+
+}



Mime
View raw message