tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [09/50] [abbrv] TEZ-444. Rename *.new* packages back to what they should be, remove dead code from the old packages - mapreduce module (part of TEZ-398). (sseth)
Date Wed, 25 Sep 2013 07:31:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
index 6817151..1e0b146 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
@@ -32,23 +32,29 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.mapreduce.processor.MRTask;
-import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVReader;
+import org.apache.tez.engine.newapi.LogicalInput;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
+
+import com.google.common.base.Preconditions;
 
 /**
  * {@link SimpleInput} is an {@link Input} which provides key/values pairs
@@ -57,161 +63,152 @@ import org.apache.tez.mapreduce.processor.MRTaskReporter;
  * It is compatible with all standard Apache Hadoop MapReduce 
  * {@link InputFormat} implementations.
  */
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class SimpleInput implements Input {
+
+public class SimpleInput implements LogicalInput {
 
   private static final Log LOG = LogFactory.getLog(SimpleInput.class);
   
-  MRTask task;
   
-  boolean useNewApi;
+  private TezInputContext inputContext;
   
-  JobConf jobConf;
+  private JobConf jobConf;
+  private Configuration incrementalConf;
+  private boolean recordReaderCreated = false;
+  
+  boolean useNewApi;
   
   org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
 
-  org.apache.hadoop.mapreduce.InputFormat newInputFormat;
-  org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+  @SuppressWarnings("rawtypes")
+  private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
+  @SuppressWarnings("rawtypes")
+  private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+  protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
   
-  org.apache.hadoop.mapred.InputFormat oldInputFormat;
-  org.apache.hadoop.mapred.RecordReader oldRecordReader;
+  @SuppressWarnings("rawtypes")
+  private InputFormat oldInputFormat;
+  @SuppressWarnings("rawtypes")
+  protected RecordReader oldRecordReader;
 
   protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   
-  Object key;
-  Object value;
-  
   private TezCounter inputRecordCounter;
   private TezCounter fileInputByteCounter; 
   private List<Statistics> fsStats;
-  private MRTaskReporter reporter;
 
-  public SimpleInput(TezEngineTaskContext task, int index)
-  {}
-  
-  public void setTask(MRTask task) {
-    this.task = task;
-  }
+  @Override
+  public List<Event> initialize(TezInputContext inputContext) throws IOException {
+    this.inputContext = inputContext;
+    Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
+    this.jobConf = new JobConf(conf);
 
-  public void initialize(Configuration conf, Master master) throws IOException,
-      InterruptedException {
-    if (task == null) {
-      return;
-    }
-    
-    if (conf instanceof JobConf) {
-      jobConf = (JobConf)conf;
-    } else {
-      jobConf = new JobConf(conf);
-    }
-    
     // Read split information.
-    TaskSplitMetaInfo[] allMetaInfo = readSplits(jobConf);
-    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[task.getTaskAttemptId()
-        .getTaskID().getId()];
-    splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
+    TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
+    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
+    this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
         thisTaskMetaInfo.getStartOffset());
     
+    // TODO NEWTEZ Rename this to be specific to SimpleInput. This Input, in
+    // theory, can be used by the MapProcessor, ReduceProcessor or a custom
+    // processor. (The processor could provide the counter though)
+    this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
+    this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
     
-    useNewApi = jobConf.getUseNewMapper();
-    taskAttemptContext = task.getTaskAttemptContext();
-    
-    inputRecordCounter = task.getInputRecordsCounter();
-    fileInputByteCounter = task.getFileInputBytesCounter();
-
-    reporter = task.getMRReporter();
+    useNewApi = this.jobConf.getUseNewMapper();
 
     if (useNewApi) {
+      TaskAttemptContext taskAttemptContext = createTaskAttemptContext();
+      Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
       try {
-        newInputFormat = 
-            ReflectionUtils.newInstance(
-                taskAttemptContext.getInputFormatClass(), jobConf);
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException(cnfe);
+        inputFormatClazz = taskAttemptContext.getInputFormatClass();
+      } catch (ClassNotFoundException e) {
+        throw new IOException("Unable to instantiate InputFormat class", e);
       }
-      
+
+      newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
+
       newInputSplit = getNewSplitDetails(splitMetaInfo);
+
       List<Statistics> matchedStats = null;
       if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
-        matchedStats = MRTask.getFsStatistics(
+        matchedStats = Utils.getFsStatistics(
             ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
-                newInputSplit).getPath(), jobConf);
+                newInputSplit).getPath(), this.jobConf);
       }
       fsStats = matchedStats;
-      newRecordReader = 
-          newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
-    } else {
-      oldInputFormat = jobConf.getInputFormat();
-      org.apache.hadoop.mapred.InputSplit oldInputSplit =
+      
+      try {
+        newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
+        newRecordReader.initialize(newInputSplit, taskAttemptContext);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while creating record reader", e);
+      }
+    } else { // OLD API
+      oldInputFormat = this.jobConf.getInputFormat();
+      InputSplit oldInputSplit =
           getOldSplitDetails(splitMetaInfo);
       
+      
       List<Statistics> matchedStats = null;
       if (oldInputSplit instanceof FileSplit) {
-        matchedStats = 
-            MRTask.getFsStatistics(
-                ((FileSplit)oldInputSplit).getPath(), jobConf);
+        matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
       }
       fsStats = matchedStats;
-
+      
       long bytesInPrev = getInputBytes();
-      oldRecordReader = 
-          jobConf.getInputFormat().getRecordReader(
-              oldInputSplit, jobConf, reporter);
+      oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
+          this.jobConf, new MRReporter(inputContext, oldInputSplit));
       long bytesInCurr = getInputBytes();
       fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-
-      updateJobWithSplit(jobConf, oldInputSplit);
-    }
+      setIncrementalConfigParams(oldInputSplit);
+    }    
+    return null;
   }
 
-  public boolean hasNext() throws IOException, InterruptedException {
-    boolean hasNext = false;
-    long bytesInPrev = getInputBytes();
+  @Override
+  public KVReader getReader() throws IOException {
+    Preconditions
+        .checkState(recordReaderCreated == false,
+            "Only a single instance of record reader can be created for this input.");
+    recordReaderCreated = true;
+    return new MRInputKVReader();
+  }
 
-    if (useNewApi) { 
-        hasNext = newRecordReader.nextKeyValue();
-    } else {
-      hasNext = oldRecordReader.next(key, value);
-    }
-    
-    long bytesInCurr = getInputBytes();
-    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-    reporter.setProgress(getProgress());
 
-    if (hasNext) {
-      inputRecordCounter.increment(1);
-    }
-    
-    return hasNext;
+  @Override
+  public void handleEvents(List<Event> inputEvents) {
+    // Not expecting any events at the moment.
   }
 
-  private SimpleValueIterator vIter = new SimpleValueIterator();
-  private SimpleIterable valuesIterable =
-      new SimpleIterable(vIter);
-
-  private org.apache.hadoop.mapreduce.InputSplit newInputSplit;
 
-  public void setKey(Object key) {
-    this.key = key;
-  }
-  
-  public void setValue(Object value) {
-    this.value = value;
+  @Override
+  public void setNumPhysicalInputs(int numInputs) {
+    // Not required at the moment. May be required if splits are sent via events.
   }
 
-  public Object getNextKey() throws IOException, InterruptedException {
+  @Override
+  public List<Event> close() throws IOException {
+    long bytesInPrev = getInputBytes();
     if (useNewApi) {
-      return newRecordReader.getCurrentKey();
+      newRecordReader.close();
     } else {
-      return key;
+      oldRecordReader.close();
     }
+    long bytesInCurr = getInputBytes();
+    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+    
+    return null;
   }
 
-  public Iterable getNextValues() throws IOException,
-      InterruptedException {
-    value = newRecordReader.getCurrentValue();
-    vIter.setValue(value);
-    return valuesIterable;
+  /**
+   * SimpleInputs sets some additional parameters like split location when using
+   * the new API. This methods returns the list of additional updates, and
+   * should be used by Processors using the old MapReduce API with SimpleInput.
+   * 
+   * @return the additional fields set by SimpleInput
+   */
+  public Configuration getConfigUpdates() {
+    return new Configuration(incrementalConf);
   }
 
   public float getProgress() throws IOException, InterruptedException {
@@ -222,27 +219,22 @@ public class SimpleInput implements Input {
     }
   }
 
-  public void close() throws IOException {
-    long bytesInPrev = getInputBytes();
-    if (useNewApi) {
-      newRecordReader.close();
-    } else {
-      oldRecordReader.close();
-    }
-    long bytesInCurr = getInputBytes();
-    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+  
+  private TaskAttemptContext createTaskAttemptContext() {
+    return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
   }
+  
 
-  static class SimpleValueIterator implements Iterator {
+  private static class SimpleValueIterator implements Iterator<Object> {
 
     private Object value;
-    
+
     public void setValue(Object value) {
       this.value = value;
     }
-    
+
     public boolean hasNext() {
-      return false;
+      return value != null;
     }
 
     public Object next() {
@@ -256,28 +248,23 @@ public class SimpleInput implements Input {
     }
   }
 
-  static class SimpleIterable implements Iterable {
-    private final Iterator iterator;
-    public SimpleIterable(Iterator iterator) {
+  private static class SimpleIterable implements Iterable<Object> {
+    private final Iterator<Object> iterator;
+    public SimpleIterable(Iterator<Object> iterator) {
       this.iterator = iterator;
     }
-    
-    public Iterator iterator() {
+
+    @Override
+    public Iterator<Object> iterator() {
       return iterator;
     }
   }
-  
 
-  public RecordReader getOldRecordReader() {
-    return oldRecordReader;
-  }
-  
-  public org.apache.hadoop.mapreduce.RecordReader getNewRecordReader() {
-    return newRecordReader;
-  }
+
+
   
-  public org.apache.hadoop.mapred.InputSplit 
-  getOldSplitDetails(TaskSplitIndex splitMetaInfo) 
+  @SuppressWarnings("unchecked")
+  private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
       throws IOException {
     Path file = new Path(splitMetaInfo.getSplitLocation());
     FileSystem fs = FileSystem.getLocal(jobConf);
@@ -306,14 +293,15 @@ public class SimpleInput implements Input {
     deserializer.open(inFile);
     org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
     long pos = inFile.getPos();
-    reporter.getCounter(TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
+    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+        .increment(pos - offset);
     inFile.close();
     return split;
   }
 
-  public org.apache.hadoop.mapreduce.InputSplit 
-  getNewSplitDetails(TaskSplitIndex splitMetaInfo) 
-      throws IOException {
+  @SuppressWarnings("unchecked")
+  private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails(
+      TaskSplitIndex splitMetaInfo) throws IOException {
     Path file = new Path(splitMetaInfo.getSplitLocation());
     long offset = splitMetaInfo.getStartOffset();
     
@@ -343,17 +331,23 @@ public class SimpleInput implements Input {
     org.apache.hadoop.mapreduce.InputSplit split = 
         deserializer.deserialize(null);
     long pos = inFile.getPos();
-    reporter.getCounter(TaskCounter.SPLIT_RAW_BYTES).increment(pos - offset);
+    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
+        .increment(pos - offset);
     inFile.close();
     return split;
   }
 
-  private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
+  private void setIncrementalConfigParams(InputSplit inputSplit) {
     if (inputSplit instanceof FileSplit) {
       FileSplit fileSplit = (FileSplit) inputSplit;
-      job.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath().toString());
-      job.setLong(JobContext.MAP_INPUT_START, fileSplit.getStart());
-      job.setLong(JobContext.MAP_INPUT_PATH, fileSplit.getLength());
+      this.incrementalConf = new Configuration(false);
+
+      this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath()
+          .toString());
+      this.incrementalConf.setLong(JobContext.MAP_INPUT_START,
+          fileSplit.getStart());
+      this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH,
+          fileSplit.getLength());
     }
     LOG.info("Processing split: " + inputSplit);
   }
@@ -367,16 +361,6 @@ public class SimpleInput implements Input {
     return bytesRead;
   }
 
-  public void initializeNewRecordReader(
-      org.apache.hadoop.mapreduce.InputSplit split, TaskAttemptContext context) 
-  throws IOException, InterruptedException {
-    newRecordReader.initialize(split, context);
-  }
-  
-  public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
-    return newInputSplit;
-  }
-
   protected TaskSplitMetaInfo[] readSplits(Configuration conf)
       throws IOException {
     TaskSplitMetaInfo[] allTaskSplitMetaInfo;
@@ -384,4 +368,71 @@ public class SimpleInput implements Input {
         FileSystem.getLocal(conf));
     return allTaskSplitMetaInfo;
   }
+  
+  private class MRInputKVReader implements KVReader {
+    
+    Object key;
+    Object value;
+
+    private SimpleValueIterator valueIterator = new SimpleValueIterator();
+    private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
+
+    private final boolean localNewApi;
+    
+    MRInputKVReader() {
+      localNewApi = useNewApi;
+      if (!localNewApi) {
+        key = oldRecordReader.createKey();
+        value =oldRecordReader.createValue();
+      }
+    }
+    
+    // Setup the values iterator once, and set value on the same object each time
+    // to prevent lots of objects being created.
+
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean next() throws IOException {
+      boolean hasNext = false;
+      long bytesInPrev = getInputBytes();
+      if (localNewApi) {
+        try {
+          hasNext = newRecordReader.nextKeyValue();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while checking for next key-value", e);
+        }
+      } else {
+        hasNext = oldRecordReader.next(key, value);
+      }
+      long bytesInCurr = getInputBytes();
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+      
+      if (hasNext) {
+        inputRecordCounter.increment(1);
+      }
+      
+      return hasNext;
+    }
+
+    @Override
+    public KVRecord getCurrentKV() throws IOException {
+      KVRecord kvRecord = null;
+      if (localNewApi) {
+        try {
+          valueIterator.setValue(newRecordReader.getCurrentValue());
+          kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while fetching next key-value", e);
+        }
+        
+      } else {
+        valueIterator.setValue(value);
+        kvRecord = new KVRecord(key, valueIterable);
+      }
+      return kvRecord;
+    }
+  };
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
new file mode 100644
index 0000000..4e61aa7
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInputLegacy.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.input;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.mapred.RecordReader;
+
+public class SimpleInputLegacy extends SimpleInput {
+
+  @Private
+  public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
+    return this.newInputSplit;
+  }  
+  
+  @SuppressWarnings("rawtypes")
+  @Private
+  public RecordReader getOldRecordReader() {
+    return this.oldRecordReader;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java
deleted file mode 100644
index 5566fd8..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newcombine/MRCombiner.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.newcombine;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapreduce.Counter;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.ReduceContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
-import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.ValuesIterator;
-import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.newapi.TezInputContext;
-import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.engine.newapi.TezTaskContext;
-import org.apache.tez.mapreduce.hadoop.MRConfig;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
-import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
-
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class MRCombiner implements Combiner {
-
-  private static Log LOG = LogFactory.getLog(MRCombiner.class);
-  
-  private final Configuration conf;
-  private final Class<?> keyClass;
-  private final Class<?> valClass;
-  private final RawComparator<?> comparator;
-  private final boolean useNewApi;
-  
-  private final TezCounter combineInputKeyCounter;
-  private final TezCounter combineInputValueCounter;
-  
-  private final MRTaskReporter reporter;
-  private final TaskAttemptID mrTaskAttemptID;
-
-  public MRCombiner(TezTaskContext taskContext) throws IOException {
-    this.conf = TezUtils.createConfFromUserPayload(taskContext.getUserPayload());
-
-    assert(taskContext instanceof TezInputContext || taskContext instanceof TezOutputContext);
-    if (taskContext instanceof TezOutputContext) {
-      this.keyClass = ConfigUtils.getIntermediateOutputKeyClass(conf);
-      this.valClass = ConfigUtils.getIntermediateOutputValueClass(conf);
-      this.comparator = ConfigUtils.getIntermediateOutputKeyComparator(conf);
-      this.reporter = new MRTaskReporter((TezOutputContext)taskContext);
-    } else {
-      this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
-      this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
-      this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
-      this.reporter = new MRTaskReporter((TezInputContext)taskContext);
-    }
-
-    this.useNewApi = ConfigUtils.useNewApi(conf);
-    
-    combineInputKeyCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
-    combineInputValueCounter = taskContext.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
-    
-    boolean isMap = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR,false);
-    this.mrTaskAttemptID = new TaskAttemptID(
-        new TaskID(String.valueOf(taskContext.getApplicationId()
-            .getClusterTimestamp()), taskContext.getApplicationId().getId(),
-            isMap ? TaskType.MAP : TaskType.REDUCE,
-            taskContext.getTaskIndex()), taskContext.getTaskAttemptNumber());
-    
-    LOG.info("Using combineKeyClass: " + keyClass + ", combineValueClass: " + valClass + ", combineComparator: " +comparator + ", useNewApi: " + useNewApi);
-  }
-
-  @Override
-  public void combine(TezRawKeyValueIterator rawIter, Writer writer)
-      throws InterruptedException, IOException {
-    if (useNewApi) {
-      runNewCombiner(rawIter, writer);
-    } else {
-      runOldCombiner(rawIter, writer);
-    }
-    
-  }
-
-  ///////////////// Methods for old API //////////////////////
-  
-  private void runOldCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws IOException {
-    Class<? extends Reducer> reducerClazz = (Class<? extends Reducer>) conf.getClass("mapred.combiner.class", null, Reducer.class);
-    
-    Reducer combiner = ReflectionUtils.newInstance(reducerClazz, conf);
-    
-    OutputCollector collector = new OutputCollector() {
-      @Override
-      public void collect(Object key, Object value) throws IOException {
-        writer.append(key, value);
-      }
-    };
-    
-    CombinerValuesIterator values = new CombinerValuesIterator(rawIter, keyClass, valClass, comparator);
-    
-    while (values.moveToNext()) {
-      combiner.reduce(values.getKey(), values.getValues().iterator(), collector, reporter);
-    }
-  }
-  
-  private final class CombinerValuesIterator<KEY,VALUE> extends ValuesIterator<KEY, VALUE> {
-    public CombinerValuesIterator(TezRawKeyValueIterator rawIter,
-        Class<KEY> keyClass, Class<VALUE> valClass,
-        RawComparator<KEY> comparator) throws IOException {
-      super(rawIter, comparator, keyClass, valClass, conf,
-          combineInputKeyCounter, combineInputValueCounter);
-    }
-  }
-  
-  ///////////////// End of methods for old API //////////////////////
-  
-  ///////////////// Methods for new API //////////////////////
-  
-  private void runNewCombiner(final TezRawKeyValueIterator rawIter, final Writer writer) throws InterruptedException, IOException {
-    
-    RecordWriter recordWriter = new RecordWriter() {
-
-      @Override
-      public void write(Object key, Object value) throws IOException,
-          InterruptedException {
-        writer.append(key, value);
-      }
-
-      @Override
-      public void close(TaskAttemptContext context) throws IOException,
-          InterruptedException {
-        // Will be closed by whoever invokes the combiner.
-      }
-    };
-    
-    Class<? extends org.apache.hadoop.mapreduce.Reducer> reducerClazz = (Class<? extends org.apache.hadoop.mapreduce.Reducer>) conf
-        .getClass(MRJobConfig.COMBINE_CLASS_ATTR, null,
-            org.apache.hadoop.mapreduce.Reducer.class);
-    org.apache.hadoop.mapreduce.Reducer reducer = ReflectionUtils.newInstance(reducerClazz, conf);
-    
-    org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
-        createReduceContext(
-            conf,
-            mrTaskAttemptID,
-            rawIter,
-            new MRCounters.MRCounter(combineInputKeyCounter),
-            new MRCounters.MRCounter(combineInputValueCounter),
-            recordWriter,
-            reporter,
-            (RawComparator)comparator,
-            keyClass,
-            valClass);
-    
-    reducer.run(reducerContext);
-    recordWriter.close(reducerContext);
-  }
-
-  private static <KEYIN, VALUEIN, KEYOUT, VALUEOUT> org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext(
-      Configuration conf,
-      TaskAttemptID mrTaskAttemptID,
-      final TezRawKeyValueIterator rawIter,
-      Counter combineInputKeyCounter,
-      Counter combineInputValueCounter,
-      RecordWriter<KEYOUT, VALUEOUT> recordWriter,
-      MRTaskReporter reporter,
-      RawComparator<KEYIN> comparator,
-      Class<KEYIN> keyClass,
-      Class<VALUEIN> valClass) throws InterruptedException, IOException {
-
-    RawKeyValueIterator r = new RawKeyValueIterator() {
-
-      @Override
-      public boolean next() throws IOException {
-        return rawIter.next();
-      }
-
-      @Override
-      public DataInputBuffer getValue() throws IOException {
-        return rawIter.getValue();
-      }
-
-      @Override
-      public Progress getProgress() {
-        return rawIter.getProgress();
-      }
-
-      @Override
-      public DataInputBuffer getKey() throws IOException {
-        return rawIter.getKey();
-      }
-
-      @Override
-      public void close() throws IOException {
-        rawIter.close();
-      }
-    };
-
-    ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> rContext = new ReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>(
-        conf, mrTaskAttemptID, r, combineInputKeyCounter,
-        combineInputValueCounter, recordWriter, null, reporter, comparator,
-        keyClass, valClass);
-
-    org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>()
-        .getReducerContext(rContext);
-    return reducerContext;
-  }
-
-  
- 
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/ShuffledMergedInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/ShuffledMergedInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/ShuffledMergedInputLegacy.java
deleted file mode 100644
index a83620e..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/ShuffledMergedInputLegacy.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
- * intermediate sorted data, merges them and provides key/<values> to the
- * consumer.
- * 
- * The Copy and Merge will be triggered by the initialization - which is handled
- * by the Tez framework. Input is not consumable until the Copy and Merge are
- * complete. Methods are provided to check for this, as well as to wait for
- * completion. Attempting to get a reader on a non-complete input will block.
- * 
- */
-
-package org.apache.tez.mapreduce.newinput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-
-public class ShuffledMergedInputLegacy extends ShuffledMergedInput {
-
-  @Private
-  public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException {
-    // wait for input so that iterator is available
-    waitForInputReady();
-    return rawIter;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
deleted file mode 100644
index 73d8cc7..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.newinput;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVReader;
-import org.apache.tez.engine.newapi.LogicalInput;
-import org.apache.tez.engine.newapi.TezInputContext;
-import org.apache.tez.mapreduce.common.Utils;
-import org.apache.tez.mapreduce.hadoop.newmapred.MRReporter;
-import org.apache.tez.mapreduce.hadoop.newmapreduce.TaskAttemptContextImpl;
-
-import com.google.common.base.Preconditions;
-
-/**
- * {@link SimpleInput} is an {@link Input} which provides key/values pairs
- * for the consumer.
- *
- * It is compatible with all standard Apache Hadoop MapReduce 
- * {@link InputFormat} implementations.
- */
-
-public class SimpleInput implements LogicalInput {
-
-  private static final Log LOG = LogFactory.getLog(SimpleInput.class);
-  
-  
-  private TezInputContext inputContext;
-  
-  private JobConf jobConf;
-  private Configuration incrementalConf;
-  private boolean recordReaderCreated = false;
-  
-  boolean useNewApi;
-  
-  org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
-
-  @SuppressWarnings("rawtypes")
-  private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
-  @SuppressWarnings("rawtypes")
-  private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
-  protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
-  
-  @SuppressWarnings("rawtypes")
-  private InputFormat oldInputFormat;
-  @SuppressWarnings("rawtypes")
-  protected RecordReader oldRecordReader;
-
-  protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
-  
-  private TezCounter inputRecordCounter;
-  private TezCounter fileInputByteCounter; 
-  private List<Statistics> fsStats;
-
-  @Override
-  public List<Event> initialize(TezInputContext inputContext) throws IOException {
-    this.inputContext = inputContext;
-    Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
-    this.jobConf = new JobConf(conf);
-
-    // Read split information.
-    TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
-    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
-    this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
-        thisTaskMetaInfo.getStartOffset());
-    
-    // TODO NEWTEZ Rename this to be specific to SimpleInput. This Input, in
-    // theory, can be used by the MapProcessor, ReduceProcessor or a custom
-    // processor. (The processor could provide the counter though)
-    this.inputRecordCounter = inputContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
-    this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
-    
-    useNewApi = this.jobConf.getUseNewMapper();
-
-    if (useNewApi) {
-      TaskAttemptContext taskAttemptContext = createTaskAttemptContext();
-      Class<? extends org.apache.hadoop.mapreduce.InputFormat<?, ?>> inputFormatClazz;
-      try {
-        inputFormatClazz = taskAttemptContext.getInputFormatClass();
-      } catch (ClassNotFoundException e) {
-        throw new IOException("Unable to instantiate InputFormat class", e);
-      }
-
-      newInputFormat = ReflectionUtils.newInstance(inputFormatClazz, this.jobConf);
-
-      newInputSplit = getNewSplitDetails(splitMetaInfo);
-
-      List<Statistics> matchedStats = null;
-      if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
-        matchedStats = Utils.getFsStatistics(
-            ((org.apache.hadoop.mapreduce.lib.input.FileSplit)
-                newInputSplit).getPath(), this.jobConf);
-      }
-      fsStats = matchedStats;
-      
-      try {
-        newRecordReader = newInputFormat.createRecordReader(newInputSplit, taskAttemptContext);
-        newRecordReader.initialize(newInputSplit, taskAttemptContext);
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while creating record reader", e);
-      }
-    } else { // OLD API
-      oldInputFormat = this.jobConf.getInputFormat();
-      InputSplit oldInputSplit =
-          getOldSplitDetails(splitMetaInfo);
-      
-      
-      List<Statistics> matchedStats = null;
-      if (oldInputSplit instanceof FileSplit) {
-        matchedStats = Utils.getFsStatistics(((FileSplit) oldInputSplit).getPath(), this.jobConf);
-      }
-      fsStats = matchedStats;
-      
-      long bytesInPrev = getInputBytes();
-      oldRecordReader = oldInputFormat.getRecordReader(oldInputSplit,
-          this.jobConf, new MRReporter(inputContext, oldInputSplit));
-      long bytesInCurr = getInputBytes();
-      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-      setIncrementalConfigParams(oldInputSplit);
-    }    
-    return null;
-  }
-
-  @Override
-  public KVReader getReader() throws IOException {
-    Preconditions
-        .checkState(recordReaderCreated == false,
-            "Only a single instance of record reader can be created for this input.");
-    recordReaderCreated = true;
-    return new MRInputKVReader();
-  }
-
-
-  @Override
-  public void handleEvents(List<Event> inputEvents) {
-    // Not expecting any events at the moment.
-  }
-
-
-  @Override
-  public void setNumPhysicalInputs(int numInputs) {
-    // Not required at the moment. May be required if splits are sent via events.
-  }
-
-  @Override
-  public List<Event> close() throws IOException {
-    long bytesInPrev = getInputBytes();
-    if (useNewApi) {
-      newRecordReader.close();
-    } else {
-      oldRecordReader.close();
-    }
-    long bytesInCurr = getInputBytes();
-    fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-    
-    return null;
-  }
-
-  /**
-   * SimpleInputs sets some additional parameters like split location when using
-   * the new API. This methods returns the list of additional updates, and
-   * should be used by Processors using the old MapReduce API with SimpleInput.
-   * 
-   * @return the additional fields set by SimpleInput
-   */
-  public Configuration getConfigUpdates() {
-    return new Configuration(incrementalConf);
-  }
-
-  public float getProgress() throws IOException, InterruptedException {
-    if (useNewApi) {
-      return newRecordReader.getProgress();
-    } else {
-      return oldRecordReader.getProgress();
-    }
-  }
-
-  
-  private TaskAttemptContext createTaskAttemptContext() {
-    return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
-  }
-  
-
-  private static class SimpleValueIterator implements Iterator<Object> {
-
-    private Object value;
-
-    public void setValue(Object value) {
-      this.value = value;
-    }
-
-    public boolean hasNext() {
-      return value != null;
-    }
-
-    public Object next() {
-      Object value = this.value;
-      this.value = null;
-      return value;
-    }
-
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private static class SimpleIterable implements Iterable<Object> {
-    private final Iterator<Object> iterator;
-    public SimpleIterable(Iterator<Object> iterator) {
-      this.iterator = iterator;
-    }
-
-    @Override
-    public Iterator<Object> iterator() {
-      return iterator;
-    }
-  }
-
-
-
-  
-  @SuppressWarnings("unchecked")
-  private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
-      throws IOException {
-    Path file = new Path(splitMetaInfo.getSplitLocation());
-    FileSystem fs = FileSystem.getLocal(jobConf);
-    file = fs.makeQualified(file);
-    LOG.info("Reading input split file from : " + file);
-    long offset = splitMetaInfo.getStartOffset();
-    
-    FSDataInputStream inFile = fs.open(file);
-    inFile.seek(offset);
-    String className = Text.readString(inFile);
-    Class<org.apache.hadoop.mapred.InputSplit> cls;
-    try {
-      cls = 
-          (Class<org.apache.hadoop.mapred.InputSplit>) 
-          jobConf.getClassByName(className);
-    } catch (ClassNotFoundException ce) {
-      IOException wrap = new IOException("Split class " + className + 
-          " not found");
-      wrap.initCause(ce);
-      throw wrap;
-    }
-    SerializationFactory factory = new SerializationFactory(jobConf);
-    Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = 
-        (Deserializer<org.apache.hadoop.mapred.InputSplit>) 
-        factory.getDeserializer(cls);
-    deserializer.open(inFile);
-    org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
-    long pos = inFile.getPos();
-    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
-        .increment(pos - offset);
-    inFile.close();
-    return split;
-  }
-
-  @SuppressWarnings("unchecked")
-  private org.apache.hadoop.mapreduce.InputSplit getNewSplitDetails(
-      TaskSplitIndex splitMetaInfo) throws IOException {
-    Path file = new Path(splitMetaInfo.getSplitLocation());
-    long offset = splitMetaInfo.getStartOffset();
-    
-    // Split information read from local filesystem.
-    FileSystem fs = FileSystem.getLocal(jobConf);
-    file = fs.makeQualified(file);
-    LOG.info("Reading input split file from : " + file);
-    FSDataInputStream inFile = fs.open(file);
-    inFile.seek(offset);
-    String className = Text.readString(inFile);
-    Class<org.apache.hadoop.mapreduce.InputSplit> cls;
-    try {
-      cls = 
-          (Class<org.apache.hadoop.mapreduce.InputSplit>) 
-          jobConf.getClassByName(className);
-    } catch (ClassNotFoundException ce) {
-      IOException wrap = new IOException("Split class " + className + 
-          " not found");
-      wrap.initCause(ce);
-      throw wrap;
-    }
-    SerializationFactory factory = new SerializationFactory(jobConf);
-    Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = 
-        (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) 
-        factory.getDeserializer(cls);
-    deserializer.open(inFile);
-    org.apache.hadoop.mapreduce.InputSplit split = 
-        deserializer.deserialize(null);
-    long pos = inFile.getPos();
-    inputContext.getCounters().findCounter(TaskCounter.SPLIT_RAW_BYTES)
-        .increment(pos - offset);
-    inFile.close();
-    return split;
-  }
-
-  private void setIncrementalConfigParams(InputSplit inputSplit) {
-    if (inputSplit instanceof FileSplit) {
-      FileSplit fileSplit = (FileSplit) inputSplit;
-      this.incrementalConf = new Configuration(false);
-
-      this.incrementalConf.set(JobContext.MAP_INPUT_FILE, fileSplit.getPath()
-          .toString());
-      this.incrementalConf.setLong(JobContext.MAP_INPUT_START,
-          fileSplit.getStart());
-      this.incrementalConf.setLong(JobContext.MAP_INPUT_PATH,
-          fileSplit.getLength());
-    }
-    LOG.info("Processing split: " + inputSplit);
-  }
-
-  private long getInputBytes() {
-    if (fsStats == null) return 0;
-    long bytesRead = 0;
-    for (Statistics stat: fsStats) {
-      bytesRead = bytesRead + stat.getBytesRead();
-    }
-    return bytesRead;
-  }
-
-  protected TaskSplitMetaInfo[] readSplits(Configuration conf)
-      throws IOException {
-    TaskSplitMetaInfo[] allTaskSplitMetaInfo;
-    allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf,
-        FileSystem.getLocal(conf));
-    return allTaskSplitMetaInfo;
-  }
-  
-  private class MRInputKVReader implements KVReader {
-    
-    Object key;
-    Object value;
-
-    private SimpleValueIterator valueIterator = new SimpleValueIterator();
-    private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
-
-    private final boolean localNewApi;
-    
-    MRInputKVReader() {
-      localNewApi = useNewApi;
-      if (!localNewApi) {
-        key = oldRecordReader.createKey();
-        value =oldRecordReader.createValue();
-      }
-    }
-    
-    // Setup the values iterator once, and set value on the same object each time
-    // to prevent lots of objects being created.
-
-    
-    @SuppressWarnings("unchecked")
-    @Override
-    public boolean next() throws IOException {
-      boolean hasNext = false;
-      long bytesInPrev = getInputBytes();
-      if (localNewApi) {
-        try {
-          hasNext = newRecordReader.nextKeyValue();
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IOException("Interrupted while checking for next key-value", e);
-        }
-      } else {
-        hasNext = oldRecordReader.next(key, value);
-      }
-      long bytesInCurr = getInputBytes();
-      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-      
-      if (hasNext) {
-        inputRecordCounter.increment(1);
-      }
-      
-      return hasNext;
-    }
-
-    @Override
-    public KVRecord getCurrentKV() throws IOException {
-      KVRecord kvRecord = null;
-      if (localNewApi) {
-        try {
-          valueIterator.setValue(newRecordReader.getCurrentValue());
-          kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new IOException("Interrupted while fetching next key-value", e);
-        }
-        
-      } else {
-        valueIterator.setValue(value);
-        kvRecord = new KVRecord(key, valueIterable);
-      }
-      return kvRecord;
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInputLegacy.java
deleted file mode 100644
index 8f07a38..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInputLegacy.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.newinput;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.mapred.RecordReader;
-
-public class SimpleInputLegacy extends SimpleInput {
-
-  @Private
-  public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
-    return this.newInputSplit;
-  }  
-  
-  @SuppressWarnings("rawtypes")
-  @Private
-  public RecordReader getOldRecordReader() {
-    return this.oldRecordReader;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
deleted file mode 100644
index a8fb900..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newoutput/SimpleOutput.java
+++ /dev/null
@@ -1,326 +0,0 @@
-package org.apache.tez.mapreduce.newoutput;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVWriter;
-import org.apache.tez.engine.newapi.LogicalOutput;
-import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.mapreduce.common.Utils;
-import org.apache.tez.mapreduce.hadoop.MRConfig;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.newmapred.MRReporter;
-import org.apache.tez.mapreduce.hadoop.newmapreduce.TaskAttemptContextImpl;
-import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
-
-public class SimpleOutput implements LogicalOutput {
-
-  private static final Log LOG = LogFactory.getLog(SimpleOutput.class);
-
-  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-  static {
-    NUMBER_FORMAT.setMinimumIntegerDigits(5);
-    NUMBER_FORMAT.setGroupingUsed(false);
-  }
-
-  private TezOutputContext outputContext;
-  private JobConf jobConf;
-  boolean useNewApi;
-  private AtomicBoolean closed = new AtomicBoolean(false);
-
-  @SuppressWarnings("rawtypes")
-  org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
-  @SuppressWarnings("rawtypes")
-  org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
-
-  @SuppressWarnings("rawtypes")
-  org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
-  @SuppressWarnings("rawtypes")
-  org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
-
-  private TezCounter outputRecordCounter;
-  private TezCounter fileOutputByteCounter;
-  private List<Statistics> fsStats;
-
-  private TaskAttemptContext newApiTaskAttemptContext;
-  private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
-
-  private boolean isMapperOutput;
-
-  private OutputCommitter committer;
-
-  @Override
-  public List<Event> initialize(TezOutputContext outputContext)
-      throws IOException, InterruptedException {
-    LOG.info("Initializing Simple Output");
-    this.outputContext = outputContext;
-    Configuration conf = TezUtils.createConfFromUserPayload(
-        outputContext.getUserPayload());
-    this.jobConf = new JobConf(conf);
-    this.useNewApi = this.jobConf.getUseNewMapper();
-    this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
-        false);
-    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-        outputContext.getDAGAttemptNumber());
-
-    outputRecordCounter = outputContext.getCounters().findCounter(
-        TaskCounter.MAP_OUTPUT_RECORDS);
-    fileOutputByteCounter = outputContext.getCounters().findCounter(
-        FileOutputFormatCounter.BYTES_WRITTEN);
-
-    if (useNewApi) {
-      newApiTaskAttemptContext = createTaskAttemptContext();
-      try {
-        newOutputFormat =
-            ReflectionUtils.newInstance(
-                newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException(cnfe);
-      }
-
-      List<Statistics> matchedStats = null;
-      if (newOutputFormat instanceof
-          org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
-        matchedStats =
-            Utils.getFsStatistics(
-                org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
-                    .getOutputPath(newApiTaskAttemptContext),
-                jobConf);
-      }
-      fsStats = matchedStats;
-
-      long bytesOutPrev = getOutputBytes();
-      try {
-        newRecordWriter =
-            newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while creating record writer", e);
-      }
-      long bytesOutCurr = getOutputBytes();
-      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
-    } else {
-      TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
-          outputContext.getApplicationId().getClusterTimestamp()),
-          outputContext.getApplicationId().getId(),
-          (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
-          outputContext.getTaskIndex()),
-          outputContext.getTaskAttemptNumber());
-      jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
-      jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
-      jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
-      jobConf.setInt(JobContext.TASK_PARTITION,
-          taskAttemptId.getTaskID().getId());
-      jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
-
-      oldApiTaskAttemptContext =
-          new org.apache.tez.mapreduce.hadoop.newmapred.TaskAttemptContextImpl(
-              jobConf, taskAttemptId,
-              new MRTaskReporter(outputContext));
-      oldOutputFormat = jobConf.getOutputFormat();
-
-      List<Statistics> matchedStats = null;
-      if (oldOutputFormat
-          instanceof org.apache.hadoop.mapred.FileOutputFormat) {
-        matchedStats =
-            Utils.getFsStatistics(
-                org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
-                    jobConf),
-                jobConf);
-      }
-      fsStats = matchedStats;
-
-      FileSystem fs = FileSystem.get(jobConf);
-      String finalName = getOutputName();
-
-      long bytesOutPrev = getOutputBytes();
-      oldRecordWriter =
-          oldOutputFormat.getRecordWriter(
-              fs, jobConf, finalName, new MRReporter(outputContext));
-      long bytesOutCurr = getOutputBytes();
-      fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
-    }
-    initCommitter(jobConf, useNewApi);
-
-    LOG.info("Initialized Simple Output"
-        + ", using_new_api: " + useNewApi);
-    return null;
-  }
-
-  public void initCommitter(JobConf job, boolean useNewApi)
-      throws IOException, InterruptedException {
-
-    if (useNewApi) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("using new api for output committer");
-      }
-
-      OutputFormat<?, ?> outputFormat = null;
-      try {
-        outputFormat = ReflectionUtils.newInstance(
-            newApiTaskAttemptContext.getOutputFormatClass(), job);
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException("Unknown OutputFormat", cnfe);
-      }
-      this.committer = outputFormat.getOutputCommitter(
-          newApiTaskAttemptContext);
-    } else {
-      this.committer = job.getOutputCommitter();
-    }
-
-    Path outputPath = FileOutputFormat.getOutputPath(job);
-    if (outputPath != null) {
-      if ((this.committer instanceof FileOutputCommitter)) {
-        FileOutputFormat.setWorkOutputPath(job,
-            ((FileOutputCommitter) this.committer).getTaskAttemptPath(
-                oldApiTaskAttemptContext));
-      } else {
-        FileOutputFormat.setWorkOutputPath(job, outputPath);
-      }
-    }
-    if (useNewApi) {
-      this.committer.setupTask(newApiTaskAttemptContext);
-    } else {
-      this.committer.setupTask(oldApiTaskAttemptContext);
-    }
-  }
-
-  public boolean isCommitRequired() throws IOException {
-    if (useNewApi) {
-      return committer.needsTaskCommit(newApiTaskAttemptContext);
-    } else {
-      return committer.needsTaskCommit(oldApiTaskAttemptContext);
-    }
-  }
-
-  private TaskAttemptContext createTaskAttemptContext() {
-    return new TaskAttemptContextImpl(this.jobConf, outputContext,
-        isMapperOutput);
-  }
-
-  private long getOutputBytes() {
-    if (fsStats == null) return 0;
-    long bytesWritten = 0;
-    for (Statistics stat: fsStats) {
-      bytesWritten = bytesWritten + stat.getBytesWritten();
-    }
-    return bytesWritten;
-  }
-
-  private String getOutputName() {
-    return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
-  }
-
-  @Override
-  public KVWriter getWriter() throws IOException {
-    return new KVWriter() {
-      private final boolean useNewWriter = useNewApi;
-
-      @SuppressWarnings("unchecked")
-      @Override
-      public void write(Object key, Object value) throws IOException {
-        long bytesOutPrev = getOutputBytes();
-        if (useNewWriter) {
-          try {
-            newRecordWriter.write(key, value);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Interrupted while writing next key-value",e);
-          }
-        } else {
-          oldRecordWriter.write(key, value);
-        }
-
-        long bytesOutCurr = getOutputBytes();
-        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
-        outputRecordCounter.increment(1);
-      }
-    };
-  }
-
-  @Override
-  public void handleEvents(List<Event> outputEvents) {
-    // Not expecting any events at the moment.
-  }
-
-  @Override
-  public synchronized List<Event> close() throws IOException {
-    if (closed.getAndSet(true)) {
-      return null;
-    }
-
-    LOG.info("Closing Simple Output");
-    long bytesOutPrev = getOutputBytes();
-    if (useNewApi) {
-      try {
-        newRecordWriter.close(newApiTaskAttemptContext);
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while closing record writer", e);
-      }
-    } else {
-      oldRecordWriter.close(null);
-    }
-    long bytesOutCurr = getOutputBytes();
-    fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
-    LOG.info("Closed Simple Output");
-    return null;
-  }
-
-  @Override
-  public void setNumPhysicalOutputs(int numOutputs) {
-    // Nothing to do for now
-  }
-
-  /**
-   * SimpleOutput expects that a Processor call commit prior to the
-   * Processor's completion
-   * @throws IOException
-   */
-  public void commit() throws IOException {
-    close();
-    if (useNewApi) {
-      committer.commitTask(newApiTaskAttemptContext);
-    } else {
-      committer.commitTask(oldApiTaskAttemptContext);
-    }
-  }
-
-
-  /**
-   * SimpleOutput expects that a Processor call abort in case of any error
-   * ( including an error during commit ) prior to the Processor's completion
-   * @throws IOException
-   */
-  public void abort() throws IOException {
-    close();
-    if (useNewApi) {
-      committer.abortTask(newApiTaskAttemptContext);
-    } else {
-      committer.abortTask(oldApiTaskAttemptContext);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
deleted file mode 100644
index dcea35c..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newpartition/MRPartitioner.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.mapreduce.newpartition;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
-
-  static final Log LOG = LogFactory.getLog(MRPartitioner.class);
-
-  private final boolean useNewApi;
-  private int partitions = 1;
-
-  private org.apache.hadoop.mapreduce.Partitioner newPartitioner;
-  private org.apache.hadoop.mapred.Partitioner oldPartitioner;
-
-  public MRPartitioner(Configuration conf) {
-    this.useNewApi = ConfigUtils.useNewApi(conf);
-    this.partitions = conf.getInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, 1);
-
-    if (useNewApi) {
-      if (partitions > 1) {
-        newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
-            .newInstance(
-                (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
-                    .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
-                        org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class), conf);
-      } else {
-        newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
-          @Override
-          public int getPartition(Object key, Object value, int numPartitions) {
-            return numPartitions - 1;
-          }
-        };
-      }
-    } else {
-      if (partitions > 1) {
-        oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
-            (Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass(
-                "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class), conf);
-      } else {
-        oldPartitioner = new org.apache.hadoop.mapred.Partitioner() {
-          @Override
-          public void configure(JobConf job) {
-          }
-
-          @Override
-          public int getPartition(Object key, Object value, int numPartitions) {
-            return numPartitions - 1;
-          }
-        };
-      }
-    }
-  }
-
-  @Override
-  public int getPartition(Object key, Object value, int numPartitions) {
-    if (useNewApi) {
-      return newPartitioner.getPartition(key, value, numPartitions);
-    } else {
-      return oldPartitioner.getPartition(key, value, numPartitions);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/FileSystemStatisticsUpdater.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/FileSystemStatisticsUpdater.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/FileSystemStatisticsUpdater.java
deleted file mode 100644
index 5f97049..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/FileSystemStatisticsUpdater.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.mapreduce.newprocessor;
-
-import java.util.List;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.tez.common.counters.FileSystemCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-
-  /**
-   * An updater that tracks the last number reported for a given file
-   * system and only creates the counters when they are needed.
-   */
-  class FileSystemStatisticUpdater {
-    private List<FileSystem.Statistics> stats;
-    private TezCounter readBytesCounter, writeBytesCounter,
-        readOpsCounter, largeReadOpsCounter, writeOpsCounter;
-    private String scheme;
-    private TezCounters counters;
-
-    FileSystemStatisticUpdater(TezCounters counters, List<FileSystem.Statistics> stats, String scheme) {
-      this.stats = stats;
-      this.scheme = scheme;
-      this.counters = counters;
-    }
-
-    void updateCounters() {
-      if (readBytesCounter == null) {
-        readBytesCounter = counters.findCounter(scheme,
-            FileSystemCounter.BYTES_READ);
-      }
-      if (writeBytesCounter == null) {
-        writeBytesCounter = counters.findCounter(scheme,
-            FileSystemCounter.BYTES_WRITTEN);
-      }
-      if (readOpsCounter == null) {
-        readOpsCounter = counters.findCounter(scheme,
-            FileSystemCounter.READ_OPS);
-      }
-      if (largeReadOpsCounter == null) {
-        largeReadOpsCounter = counters.findCounter(scheme,
-            FileSystemCounter.LARGE_READ_OPS);
-      }
-      if (writeOpsCounter == null) {
-        writeOpsCounter = counters.findCounter(scheme,
-            FileSystemCounter.WRITE_OPS);
-      }
-      long readBytes = 0;
-      long writeBytes = 0;
-      long readOps = 0;
-      long largeReadOps = 0;
-      long writeOps = 0;
-      for (FileSystem.Statistics stat: stats) {
-        readBytes = readBytes + stat.getBytesRead();
-        writeBytes = writeBytes + stat.getBytesWritten();
-        readOps = readOps + stat.getReadOps();
-        largeReadOps = largeReadOps + stat.getLargeReadOps();
-        writeOps = writeOps + stat.getWriteOps();
-      }
-      readBytesCounter.setValue(readBytes);
-      writeBytesCounter.setValue(writeBytes);
-      readOpsCounter.setValue(readOps);
-      largeReadOpsCounter.setValue(largeReadOps);
-      writeOpsCounter.setValue(writeOps);
-    }
-  }
-  

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/GcTimeUpdater.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/GcTimeUpdater.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/GcTimeUpdater.java
deleted file mode 100644
index b05f011..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/GcTimeUpdater.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.mapreduce.newprocessor;
-
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.util.List;
-
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.common.counters.TaskCounter;
-
-/**
-   * An updater that tracks the amount of time this task has spent in GC.
-   */
-  class GcTimeUpdater {
-    private long lastGcMillis = 0;
-    private List<GarbageCollectorMXBean> gcBeans = null;
-    TezCounters counters;
-
-    public GcTimeUpdater(TezCounters counters) {
-      this.gcBeans = ManagementFactory.getGarbageCollectorMXBeans();
-      getElapsedGc(); // Initialize 'lastGcMillis' with the current time spent.
-      this.counters = counters;
-    }
-
-    /**
-     * @return the number of milliseconds that the gc has used for CPU
-     * since the last time this method was called.
-     */
-    protected long getElapsedGc() {
-      long thisGcMillis = 0;
-      for (GarbageCollectorMXBean gcBean : gcBeans) {
-        thisGcMillis += gcBean.getCollectionTime();
-      }
-
-      long delta = thisGcMillis - lastGcMillis;
-      this.lastGcMillis = thisGcMillis;
-      return delta;
-    }
-
-    /**
-     * Increment the gc-elapsed-time counter.
-     */
-    public void incrementGcCounter() {
-      if (null == counters) {
-        return; // nothing to do.
-      }
-
-      TezCounter gcCounter =
-        counters.findCounter(TaskCounter.GC_TIME_MILLIS);
-      if (null != gcCounter) {
-        gcCounter.increment(getElapsedGc());
-      }
-    }
-  }


Mime
View raw message