tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] tez git commit: TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317. Contributed by Kuhu Shukla.
Date Wed, 26 Oct 2016 18:06:24 GMT
TEZ-3437. Improve synchronization and the progress report behavior for
Inputs from TEZ-3317. Contributed by Kuhu Shukla.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/bdf5b69f
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/bdf5b69f
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/bdf5b69f

Branch: refs/heads/branch-0.7
Commit: bdf5b69f430959fbd83a5902df14e7ce4dd08078
Parents: 0a6db28
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Oct 26 11:04:57 2016 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Oct 26 11:04:57 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/ProgressHelper.java   | 89 ++++++++++++++++++++
 .../tez/runtime/api/AbstractLogicalInput.java   |  2 +-
 .../tez/runtime/api/MergedLogicalInput.java     |  2 +-
 .../runtime/api/ProgressFailedException.java    | 46 ++++++++++
 .../org/apache/tez/mapreduce/input/MRInput.java |  9 +-
 .../tez/mapreduce/input/MRInputLegacy.java      |  3 +-
 .../mapreduce/processor/map/MapProcessor.java   | 54 +++++-------
 .../processor/reduce/ReduceProcessor.java       | 38 ++-------
 .../tez/mapreduce/processor/MapUtils.java       |  9 +-
 .../processor/map/TestMapProcessor.java         | 78 ++---------------
 .../processor/reduce/TestReduceProcessor.java   |  2 +-
 .../api/impl/TezProcessorContextImpl.java       |  2 +-
 .../common/readers/UnorderedKVReader.java       | 26 +++---
 .../common/shuffle/impl/ShuffleManager.java     |  4 +-
 .../input/ConcatenatedMergedKeyValueInput.java  | 11 ++-
 .../input/ConcatenatedMergedKeyValuesInput.java | 11 ++-
 .../library/input/OrderedGroupedKVInput.java    |  3 +-
 .../input/OrderedGroupedMergedKVInput.java      |  3 +-
 .../runtime/library/input/UnorderedKVInput.java |  9 +-
 .../library/processor/SimpleProcessor.java      | 42 ++-------
 .../library/processor/SleepProcessor.java       | 39 ++-------
 .../processor/FilterByWordInputProcessor.java   | 39 ++-------
 .../processor/FilterByWordOutputProcessor.java  |  3 +-
 24 files changed, 253 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e2eaf70..225b449 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3437. Improve synchronization and the progress report behavior for Inputs from TEZ-3317.
   TEZ-3317. Speculative execution starts too early due to 0 progress.
   TEZ-3452. Auto-reduce parallelism calculation can overflow with large inputs
   TEZ-3439. Tez joinvalidate fails when first input argument size is bigger than the second.

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
new file mode 100644
index 0000000..407a20e
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/ProgressHelper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.ProgressFailedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ProgressHelper {
+  private static final Logger LOG = LoggerFactory.getLogger(ProgressHelper.class);
+  private String processorName;
+  protected final Map<String, LogicalInput> inputs;
+  final ProcessorContext processorContext;
+
+  volatile ScheduledExecutorService scheduledExecutorService;
+  Runnable monitorProgress = new Runnable() {
+    @Override
+    public void run() {
+      try {
+        float progSum = 0.0f;
+        float progress;
+        if (inputs != null && inputs.size() != 0) {
+          for (LogicalInput input : inputs.values()) {
+            if (input instanceof AbstractLogicalInput) {
+              progSum += ((AbstractLogicalInput) input).getProgress();
+            }
+          }
+          progress = (1.0f) * progSum / inputs.size();
+        } else {
+          progress = 1.0f;
+        }
+        processorContext.setProgress(progress);
+      } catch (ProgressFailedException pe) {
+        LOG.warn("Encountered ProgressFailedException during Processor progress update"
+            + pe);
+      } catch (InterruptedException ie) {
+        LOG.warn("Encountered InterruptedException during Processor progress update"
+            + ie);
+      }
+    }
+  };
+
+  public ProgressHelper(Map<String, LogicalInput> _inputs, ProcessorContext context, String processorName) {
+    this.inputs = _inputs;
+    this.processorContext = context;
+    this.processorName = processorName;
+  }
+
+  public void scheduleProgressTaskService(long delay, long period) {
+    scheduledExecutorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
+        .setDaemon(true).setNameFormat("TaskProgressService{" + processorName+ ":" + processorContext.getTaskVertexName()
+            + "} #%d").build());
+    scheduledExecutorService.scheduleWithFixedDelay(monitorProgress, delay, period,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public void shutDownProgressTaskService() {
+    if (scheduledExecutorService != null) {
+      scheduledExecutorService.shutdownNow();
+      scheduledExecutorService = null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
index 4c95eb9..a97f3fa 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalInput.java
@@ -82,7 +82,7 @@ public abstract class AbstractLogicalInput implements LogicalInput, LogicalInput
     return inputContext;
   }
 
-  public float getProgress() throws IOException, InterruptedException {
+  public float getProgress() throws ProgressFailedException, InterruptedException {
     return 0.0f;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
index 3195a17..e3c3624 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/MergedLogicalInput.java
@@ -94,7 +94,7 @@ public abstract class MergedLogicalInput implements LogicalInput {
    */
   public abstract void setConstituentInputIsReady(Input input);
 
-  public float getProgress() throws IOException, InterruptedException {
+  public float getProgress() throws ProgressFailedException, InterruptedException {
     return 0.0f;
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java
new file mode 100644
index 0000000..07995cc
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProgressFailedException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.dag.api.TezException;
+
+@Public
+@Evolving
+/**
+ * Exception invoked when getProgress fails
+ */
+public class ProgressFailedException extends TezException {
+
+  private static final long serialVersionUID = -114180015419275775L;
+
+  public ProgressFailedException() {
+    super("Progress update failed");
+  }
+
+  public ProgressFailedException(Throwable cause) {
+    super("Progress update failed", cause);
+  }
+
+  public ProgressFailedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 46ac87f..b93b021 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.protobuf.ByteString;
 
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -582,8 +583,12 @@ public class MRInput extends MRInputBase {
   }
 
   @Override
-  public float getProgress() throws IOException,InterruptedException {
-    return (mrReader != null) ? mrReader.getProgress() : 0.0f;
+  public float getProgress() throws ProgressFailedException, InterruptedException {
+    try {
+      return (mrReader != null) ? mrReader.getProgress() : 0.0f;
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException ", e);
+    }
   }
 
   void processSplitEvent(InputDataInformationEvent event)

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index 9b5ed1c..70be7ee 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -87,7 +88,7 @@ public class MRInputLegacy extends MRInput {
     return (org.apache.hadoop.mapreduce.RecordReader) mrReader.getRecordReader();
   }
 
-  public float getProgress() throws IOException, InterruptedException {
+  public float getProgress() throws ProgressFailedException, InterruptedException {
       return super.getProgress();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 017acf8..462fbda 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -21,9 +21,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
 
+import org.apache.tez.common.ProgressHelper;
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -46,7 +46,6 @@ import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
@@ -63,32 +62,7 @@ public class MapProcessor extends MRTask{
 
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
-  Timer progressTimer = new Timer();
-  TimerTask progressTask = new TimerTask() {
-
-    @Override
-    public void run() {
-      try {
-        float progSum = 0.0f;
-        if (inputs != null && inputs.size() != 0) {
-          for(LogicalInput input : inputs.values()) {
-            if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
-            }
-          }
-          float progress = (1.0f) * progSum / inputs.size();
-          getContext().setProgress(progress);
-          mrReporter.setProgress(progress);
-        }
-      } catch (IOException e) {
-        LOG.warn("Encountered IOException during Processor progress update"
-            + e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.warn("Encountered InterruptedException during Processor progress"
-            + "update" + e.getMessage());
-      }
-    }
-  };
+  private ProgressHelper progressHelper;
 
   public MapProcessor(ProcessorContext processorContext) {
     super(processorContext, true);
@@ -101,8 +75,9 @@ public class MapProcessor extends MRTask{
   }
 
   public void close() throws IOException {
-    progressTimer.cancel();
-
+    if (progressHelper != null) {
+      progressHelper.shutDownProgressTaskService();
+    }
   }
 
   @Override
@@ -111,6 +86,7 @@ public class MapProcessor extends MRTask{
 
     this.inputs = _inputs;
     this.outputs = _outputs;
+    progressHelper = new ProgressHelper(this.inputs, getContext(), this.getClass().getSimpleName());
     LOG.info("Running map: " + processorContext.getUniqueIdentifier());
     for (LogicalInput input : _inputs.values()) {
       input.start();
@@ -129,7 +105,7 @@ public class MapProcessor extends MRTask{
     LogicalOutput out = _outputs.values().iterator().next();
 
     initTask(out);
-    progressTimer.schedule(progressTask, 0, 100);
+    progressHelper.scheduleProgressTaskService(0, 100);
 
     // Sanity check
     if (!(in instanceof MRInputLegacy)) {
@@ -315,7 +291,17 @@ public class MapProcessor extends MRTask{
 
     @Override
     public float getProgress() throws IOException, InterruptedException {
-      return in.getProgress();
+      try {
+        return in.getProgress();
+      } catch (ProgressFailedException e) {
+        if (e.getCause() instanceof IOException) {
+          throw (IOException)e.getCause();
+        }
+        if (e.getCause() instanceof InterruptedException) {
+          throw (InterruptedException)e.getCause();
+        }
+      }
+      throw new RuntimeException("Could not get Processor progress");
     }
 
     @Override
@@ -366,6 +352,8 @@ public class MapProcessor extends MRTask{
     public float getProgress() throws IOException {
       try {
         return mrInput.getProgress();
+      } catch (ProgressFailedException pe) {
+        throw new IOException(pe);
       } catch (InterruptedException ie) {
         throw new IOException(ie);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 8ec6091..4b79c78 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -22,9 +22,8 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
+import org.apache.tez.common.ProgressHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -44,7 +43,6 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.mapreduce.output.MROutputLegacy;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -68,32 +66,7 @@ public class ReduceProcessor extends MRTask {
 
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
-  Timer progressTimer = new Timer();
-  TimerTask progressTask = new TimerTask() {
-
-    @Override
-    public void run() {
-      try {
-        float progSum = 0.0f;
-        if (inputs != null && inputs.size() != 0) {
-          for(LogicalInput input : inputs.values()) {
-            if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
-            }
-          }
-          float progress = (1.0f) * progSum / inputs.size();
-          getContext().setProgress(progress);
-          mrReporter.setProgress(progress);
-        }
-      } catch (IOException e) {
-        LOG.warn("Encountered IOException during Processor progress update"
-            + e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.warn("Encountered InterruptedException during Processor progress"
-            + "update" + e.getMessage());
-      }
-    }
-  };
+  private ProgressHelper progressHelper;
 
   public ReduceProcessor(ProcessorContext processorContext) {
     super(processorContext, false);
@@ -106,7 +79,9 @@ public class ReduceProcessor extends MRTask {
   }
 
   public void close() throws IOException {
-    progressTimer.cancel();
+    if (progressHelper != null) {
+      progressHelper.shutDownProgressTaskService();
+    }
 
   }
 
@@ -115,6 +90,7 @@ public class ReduceProcessor extends MRTask {
       Map<String, LogicalOutput> _outputs) throws Exception {
     this.inputs = _inputs;
     this.outputs = _outputs;
+    progressHelper = new ProgressHelper(this.inputs, processorContext, this.getClass().getSimpleName());
     LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
 
     if (_outputs.size() <= 0 || _outputs.size() > 1) {
@@ -139,7 +115,7 @@ public class ReduceProcessor extends MRTask {
     out.start();
 
     initTask(out);
-    progressTimer.schedule(progressTask, 0, 100);
+    progressHelper.scheduleProgressTaskService(0, 100);
     this.statusUpdate();
 
     Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 0d0eace..0876c3c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -117,7 +117,7 @@ public class MapUtils {
   }
   
   private static InputSplit 
-  createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file) 
+  createInputSplit(FileSystem fs, Path workDir, JobConf job, Path file, int numKVs)
       throws IOException {
     FileInputFormat.setInputPaths(job, workDir);
 
@@ -131,7 +131,7 @@ public class MapUtils {
       Random r = new Random(System.currentTimeMillis());
       LongWritable key = new LongWritable();
       Text value = new Text();
-      for (int i = 10; i > 0; i--) {
+      for (int i = numKVs; i > 0; i--) {
         key.set(r.nextInt(1000));
         value.set(Integer.toString(i));
         writer.append(key, value);
@@ -188,9 +188,10 @@ public class MapUtils {
     outMeta.close();
   }
 
-  public static void generateInputSplit(FileSystem fs, Path workDir, JobConf jobConf, Path mapInput) throws IOException {
+  public static void generateInputSplit(FileSystem fs, Path workDir, JobConf jobConf, Path mapInput,
+                                        int numKVs) throws IOException {
     jobConf.setInputFormat(SequenceFileInputFormat.class);
-    InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput);
+    InputSplit split = createInputSplit(fs, workDir, jobConf, mapInput, numKVs);
     writeSplitFiles(fs, jobConf, split);
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 53b8c46..3243de5 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.db.FloatSplitter;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.junit.Assert;
@@ -149,7 +150,7 @@ public class TestMapProcessor {
     Path mapInput = new Path(workDir, "map0");
     
     
-    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput);
+    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 10);
 
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         InputDescriptor.create(MRInputLegacy.class.getName())
@@ -221,7 +222,7 @@ public class TestMapProcessor {
     Path mapInput = new Path(workDir, "map0");
 
 
-    generateInputSplit(localFs, workDir, jobConf, mapInput);
+    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 100000);
 
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         InputDescriptor.create(MRInputLegacy.class.getName())
@@ -246,84 +247,17 @@ public class TestMapProcessor {
       @Override
       public void run() {
         float prog = task.getProgress();
-        if(prog > 0.0 && prog < 1.0)
+        if(prog > 0.0f && prog < 1.0f)
           progressUpdate = prog;
       }
     });
 
     task.initialize();
-    scheduler.scheduleAtFixedRate(monitorProgress, 0, 10,
+    scheduler.scheduleAtFixedRate(monitorProgress, 0, 1,
         TimeUnit.MILLISECONDS);
     task.run();
     Assert.assertTrue("Progress Updates should be captured!",
-        progressUpdate != 0.0f);
+        progressUpdate > 0.0f && progressUpdate < 1.0f);
     task.close();
   }
-
-  public static void generateInputSplit(FileSystem fs, Path workDir,
-                                        JobConf jobConf, Path mapInput)
-      throws IOException {
-    jobConf.setInputFormat(SequenceFileInputFormat.class);
-    FileInputFormat.setInputPaths(jobConf, workDir);
-
-    LOG.info("Generating data at path: " + mapInput);
-    // create a file with length entries
-    SequenceFile.Writer writer =
-        SequenceFile.createWriter(fs, jobConf, mapInput,
-            LongWritable.class, Text.class);
-    try {
-      Random r = new Random(System.currentTimeMillis());
-      LongWritable key = new LongWritable();
-      Text value = new Text();
-      for (int i = 100000; i > 0; i--) {
-        key.set(r.nextInt(1000));
-        value.set(Integer.toString(i));
-        writer.append(key, value);
-        LOG.info("<k, v> : <" + key.get() + ", " + value + ">");
-      }
-    } finally {
-      writer.close();
-    }
-
-    SequenceFileInputFormat<LongWritable, Text> format =
-        new SequenceFileInputFormat<LongWritable, Text>();
-    InputSplit[] splits = format.getSplits(jobConf, 1);
-    System.err.println("#split = " + splits.length + " ; " +
-        "#locs = " + splits[0].getLocations().length + "; " +
-        "loc = " + splits[0].getLocations()[0] + "; " +
-        "off = " + splits[0].getLength() + "; " +
-        "file = " + ((FileSplit)splits[0]).getPath());
-    writeSplitFiles(fs, jobConf, splits[0]);
-  }
-
-  private static void writeSplitFiles(FileSystem fs, JobConf conf,
-                                      InputSplit split) throws IOException {
-    Path jobSplitFile = new Path(conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR,
-        MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR_DEFAULT), MRJobConfig.JOB_SPLIT);
-    LOG.info("Writing split to: " + jobSplitFile);
-    FSDataOutputStream out = FileSystem.create(fs, jobSplitFile,
-        new FsPermission(JOB_FILE_PERMISSION));
-
-    long offset = out.getPos();
-    Text.writeString(out, split.getClass().getName());
-    split.write(out);
-    out.close();
-
-    String[] locations = split.getLocations();
-
-    JobSplit.SplitMetaInfo info = null;
-    info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength());
-
-    Path jobSplitMetaInfoFile = new Path(
-        conf.get(MRFrameworkConfigs.TASK_LOCAL_RESOURCE_DIR),
-        MRJobConfig.JOB_SPLIT_METAINFO);
-
-    FSDataOutputStream outMeta = FileSystem.create(fs, jobSplitMetaInfoFile,
-        new FsPermission(JOB_FILE_PERMISSION));
-    outMeta.write(SplitMetaInfoReaderTez.META_SPLIT_FILE_HEADER);
-    WritableUtils.writeVInt(outMeta, SplitMetaInfoReaderTez.META_SPLIT_VERSION);
-    WritableUtils.writeVInt(outMeta, 1); // Only 1 split meta info being written
-    info.write(outMeta);
-    outMeta.close();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 21d2929..f82468c 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -138,7 +138,7 @@ public class TestReduceProcessor {
     jobConf.setBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, false);
     
     Path mapInput = new Path(workDir, "map0");
-    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput);
+    MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 10);
 
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         InputDescriptor.create(MRInputLegacy.class.getName())

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index fb13d5e..2d4ead8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -93,7 +93,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
 
   @Override
   public void setProgress(float progress) {
-    if (runtimeTask.getProgress() != progress) {
+    if (Math.abs(progress - runtimeTask.getProgress()) >= 0.001f) {
       runtimeTask.setProgress(progress);
       notifyProgress();
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index 9526bc3..8c10f77 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -19,6 +19,7 @@
 package org.apache.tez.runtime.library.common.readers;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.tez.runtime.api.InputContext;
 import org.slf4j.Logger;
@@ -72,8 +73,8 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
   // TODO Remove this once per I/O counters are separated properly. Relying on
   // the counter at the moment will generate aggregate numbers. 
   private int numRecordsRead = 0;
-  private long totalBytesRead = 0;
-  private long totalFileBytes = 0;
+  private final AtomicLong totalBytesRead = new AtomicLong(0);
+  private final AtomicLong totalFileBytes = new AtomicLong(0);
 
 
   public UnorderedKVReader(ShuffleManager shuffleManager, Configuration conf,
@@ -147,16 +148,15 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
     return value;
   }
 
-  public float getProgress() {
-    int numInputs = shuffleManager.getNumInputs();
-    if (totalFileBytes > 0 && numInputs > 0) {
-      return ((1.0f) * (totalBytesRead + ((currentReader != null) ? currentReader.bytesRead :
-      0l)) /
-          totalFileBytes) * (
-          shuffleManager.getNumCompletedInputs().floatValue() /
-              (1.0f * numInputs));
+  public float getProgress() throws IOException, InterruptedException {
+    final int numInputs = shuffleManager.getNumInputs();
+    if (totalFileBytes.get() > 0 && numInputs > 0) {
+      return ((1.0f) * (totalBytesRead.get() + ((currentReader != null) ? currentReader.bytesRead :
+      0.0f)) /
+          totalFileBytes.get()) * (shuffleManager.getNumCompletedInputsFloat() /
+          (1.0f * numInputs));
     }
-    return 0l;
+    return 0.0f;
   }
   /**
    * Tries reading the next key and value from the current reader.
@@ -189,7 +189,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
    */
   private boolean moveToNextInput() throws IOException {
     if (currentReader != null) { // Close the current reader.
-      totalBytesRead += currentReader.bytesRead;
+      totalBytesRead.getAndAdd(currentReader.bytesRead);
       currentReader.close();
       /**
        * clear reader explicitly. Otherwise this could point to stale reference when next() is
@@ -209,7 +209,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
       return false; // No more inputs
     } else {
       currentReader = openIFileReader(currentFetchedInput);
-      totalFileBytes += currentReader.getLength();
+      totalFileBytes.getAndAdd(currentReader.getLength());
       return true;
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index fbe62ad..eaf2cfd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -860,8 +860,8 @@ public class ShuffleManager implements FetcherCallback {
     return numInputs;
   }
 
-  public AtomicInteger getNumCompletedInputs() {
-    return numCompletedInputs;
+  public float getNumCompletedInputsFloat() {
+    return numCompletedInputs.floatValue();
   }
 
   /////////////////// End of methods for walking the available inputs

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 3d1805a..b00befd 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -26,6 +26,7 @@ import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.MergedInputContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
@@ -83,7 +84,7 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
       return currentReader.getCurrentValue();
     }
 
-    public float getProgress() {
+    public float getProgress() throws IOException, InterruptedException {
       return (1.0f)*(currentReaderIndex + 1)/getInputs().size();
     }
   }
@@ -104,7 +105,11 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
   }
 
   @Override
-  public float getProgress()  throws IOException, InterruptedException {
-    return concatenatedMergedKeyValueReader.getProgress();
+  public float getProgress() throws ProgressFailedException, InterruptedException {
+    try {
+      return concatenatedMergedKeyValueReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException ", e);
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index d4ff0bc..2c0c252 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -26,6 +26,7 @@ import org.apache.tez.dag.api.GroupInputEdge;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.apache.tez.runtime.api.Reader;
 import org.apache.tez.runtime.api.MergedInputContext;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
@@ -85,7 +86,7 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
       return currentReader.getCurrentValues();
     }
 
-    public float getProgress() {
+    public float getProgress() throws IOException, InterruptedException {
       return (1.0f)*(currentReaderIndex + 1)/getInputs().size();
     }
   }
@@ -106,7 +107,11 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
   }
 
   @Override
-  public float getProgress()  throws IOException, InterruptedException {
-    return concatenatedMergedKeyValuesReader.getProgress();
+  public float getProgress() throws ProgressFailedException, InterruptedException {
+    try {
+      return concatenatedMergedKeyValuesReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException ", e);
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 18c765a..59c42a1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -42,6 +42,7 @@ import org.apache.tez.dag.api.TezException;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -258,7 +259,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
   }
 
   @Override
-  public float getProgress()  throws IOException, InterruptedException {
+  public float getProgress() throws ProgressFailedException, InterruptedException {
     int totalInputs = getNumPhysicalInputs();
     if (totalInputs != 0) {
       synchronized (this) {

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
index 5d6668d..49d4043 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedMergedKVInput.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Set;
 
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -250,7 +251,7 @@ public class OrderedGroupedMergedKVInput extends MergedLogicalInput {
       }
     }
   }
-  public float getProgress() throws IOException, InterruptedException {
+  public float getProgress() throws ProgressFailedException, InterruptedException {
     float totalProgress = 0.0f;
     for(Input input : getInputs()) {
       totalProgress += ((OrderedGroupedKVInput)input).getProgress();

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index d893910..2239bf9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -41,6 +41,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.api.ProgressFailedException;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
@@ -288,7 +289,11 @@ public class UnorderedKVInput extends AbstractLogicalInput {
   }
 
   @Override
-  public float getProgress() {
-    return kvReader.getProgress();
+  public float getProgress() throws ProgressFailedException, InterruptedException {
+    try {
+      return kvReader.getProgress();
+    } catch (IOException e) {
+      throw new ProgressFailedException("getProgress encountered IOException ", e);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
index 66c3625..c237bc1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SimpleProcessor.java
@@ -17,15 +17,12 @@
  */
 package org.apache.tez.runtime.library.processor;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
+import org.apache.tez.common.ProgressHelper;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -49,31 +46,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
 
-  Timer progressTimer = new Timer();
-  TimerTask progressTask = new TimerTask() {
-
-    @Override
-    public void run() {
-      try {
-        float progSum = 0.0f;
-        if (getInputs() != null) {
-          for(LogicalInput input : getInputs().values()) {
-            if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
-            }
-          }
-          float progress = (1.0f) * progSum / inputs.size();
-          getContext().setProgress(progress);
-        }
-      } catch (IOException e) {
-        LOG.warn("Encountered IOException during Processor progress update"
-            + e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.warn("Encountered InterruptedException during Processor progress"
-            + "update" + e.getMessage());
-      }
-    }
-  };
+  protected ProgressHelper progressHelper;
 
   public SimpleProcessor(ProcessorContext context) {
     super(context);
@@ -83,6 +56,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
       throws Exception {
     this.inputs = _inputs;
     this.outputs = _outputs;
+    progressHelper = new ProgressHelper(this.inputs, getContext(),this.getClass().getSimpleName());
     preOp();
     run();
     postOp();
@@ -106,7 +80,7 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
       for (LogicalInput input : getInputs().values()) {
         input.start();
       }
-      progressTimer.schedule(progressTask, 0, 100);
+      progressHelper.scheduleProgressTaskService(0, 100);
     }
     if (getOutputs() != null) {
       for (LogicalOutput output : getOutputs().values()) {
@@ -136,7 +110,9 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-    progressTimer.cancel();
+    if( progressHelper != null) {
+      progressHelper.shutDownProgressTaskService();
+    }
   }
 
   public Map<String, LogicalInput> getInputs() {
@@ -146,8 +122,4 @@ public abstract class SimpleProcessor extends AbstractLogicalIOProcessor {
   public Map<String, LogicalOutput> getOutputs() {
     return outputs;
   }
-
-  public Timer getProgressTimer() {
-    return progressTimer;
-  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
index c58e338..0eb200f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/processor/SleepProcessor.java
@@ -18,21 +18,18 @@
 
 package org.apache.tez.runtime.library.processor;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
 import com.google.common.base.Charsets;
+import org.apache.tez.common.ProgressHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -53,32 +50,7 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
   private int timeToSleepMS;
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
-
-  Timer progressTimer = new Timer();
-  TimerTask progressTask = new TimerTask() {
-
-    @Override
-    public void run() {
-      try {
-        float progSum = 0.0f;
-        if (inputs != null) {
-          for(LogicalInput input : inputs.values()) {
-            if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
-            }
-          }
-          float progress = (1.0f) * progSum / inputs.size();
-          getContext().setProgress(progress);
-        }
-      } catch (IOException e) {
-        LOG.warn("Encountered IOException during Processor progress update" +
-            e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.warn("Encountered InterruptedException during Processor progress" +
-            "update" + e.getMessage());
-      }
-    }
-  };
+  private ProgressHelper progressHelper;
 
   public SleepProcessor(ProcessorContext context) {
     super(context);
@@ -105,12 +77,13 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
                   Map<String, LogicalOutput> _outputs) throws Exception {
     inputs = _inputs;
     outputs = _outputs;
+    progressHelper = new ProgressHelper(this.inputs, getContext(),this.getClass().getSimpleName());
     LOG.info("Running the Sleep Processor, sleeping for "
       + timeToSleepMS + " ms");
     for (LogicalInput input : _inputs.values()) {
       input.start();
     }
-    progressTimer.schedule(progressTask, 0, 100);
+    progressHelper.scheduleProgressTaskService(0, 100);
     for (LogicalOutput output : _outputs.values()) {
       output.start();
     }
@@ -128,7 +101,9 @@ public class SleepProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-    progressTimer.cancel();
+    if (progressHelper != null) {
+      progressHelper.shutDownProgressTaskService();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
index bd8490b..47c9e55 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordInputProcessor.java
@@ -18,11 +18,8 @@
 
 package org.apache.tez.mapreduce.examples.processor;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,12 +27,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.ProgressHelper;
 import org.apache.tez.mapreduce.examples.FilterLinesByWord;
 import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
-import org.apache.tez.runtime.api.AbstractLogicalInput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -52,31 +49,7 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
   private String filterWord;
   protected Map<String, LogicalInput> inputs;
   protected Map<String, LogicalOutput> outputs;
-  Timer progressTimer = new Timer();
-  TimerTask progressTask = new TimerTask() {
-
-    @Override
-    public void run() {
-      try {
-        float progSum = 0.0f;
-        if (inputs != null) {
-          for(LogicalInput input : inputs.values()) {
-            if (input instanceof AbstractLogicalInput) {
-              progSum += ((AbstractLogicalInput) input).getProgress();
-            }
-          }
-          float progress = (1.0f) * progSum / inputs.size();
-          getContext().setProgress(progress);
-        }
-      } catch (IOException e) {
-        LOG.warn("Encountered IOException during Processor progress update" +
-            e.getMessage());
-      } catch (InterruptedException e) {
-        LOG.warn("Encountered InterruptedException during Processor progress" +
-            "update" + e.getMessage());
-      }
-    }
-  };
+  private ProgressHelper progressHelper;
 
   public FilterByWordInputProcessor(ProcessorContext context) {
     super(context);
@@ -100,7 +73,9 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
 
   @Override
   public void close() throws Exception {
-    progressTimer.cancel();
+    if (progressHelper != null) {
+      progressHelper.shutDownProgressTaskService();
+    }
   }
 
   @Override
@@ -108,7 +83,7 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
       Map<String, LogicalOutput> _outputs) throws Exception {
     this.inputs = _inputs;
     this.outputs = _outputs;
-    
+    this.progressHelper = new ProgressHelper(this.inputs, getContext(),this.getClass().getSimpleName());
     if (_inputs.size() != 1) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with a single input");
     }
@@ -133,7 +108,7 @@ public class FilterByWordInputProcessor extends AbstractLogicalIOProcessor {
     if (! (lo instanceof UnorderedKVOutput)) {
       throw new IllegalStateException("FilterByWordInputProcessor processor can only work with OnFileUnorderedKVOutput");
     }
-    progressTimer.schedule(progressTask, 0, 100);
+    progressHelper.scheduleProgressTaskService(0, 100);
     MRInputLegacy mrInput = (MRInputLegacy) li;
     mrInput.init();
     UnorderedKVOutput kvOutput = (UnorderedKVOutput) lo;

http://git-wip-us.apache.org/repos/asf/tez/blob/bdf5b69f/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
index 5872527..7acaf7e 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/processor/FilterByWordOutputProcessor.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.examples.processor;
 
 import java.util.List;
 
+import org.apache.tez.common.ProgressHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -51,7 +52,7 @@ public class FilterByWordOutputProcessor extends SimpleMRProcessor {
   @Override
   public void close() throws Exception {
     LOG.info("Broadcast Output Processor closing. Nothing to do");
-    getProgressTimer().cancel();
+    super.close();
   }
 
   @Override


Mime
View raw message