tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-396. SimpleInput should be initialized before the processor and within the same thread as the processing. (sseth)
Date Mon, 26 Aug 2013 20:39:47 GMT
Updated Branches:
  refs/heads/master 6725cc590 -> 507c2dc67


TEZ-396. SimpleInput should be initialized before the processor and
within the same thread as the processing. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/507c2dc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/507c2dc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/507c2dc6

Branch: refs/heads/master
Commit: 507c2dc676ba1205173844a5d2f862307dff1c9d
Parents: 6725cc5
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Aug 26 13:39:21 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Aug 26 13:39:21 2013 -0700

----------------------------------------------------------------------
 .../mapreduce/processor/map/MapProcessor.java   | 26 +++++++++-----------
 1 file changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/507c2dc6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index b212d5e..67794e7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -137,7 +137,9 @@ public class MapProcessor extends MRTask implements Processor {
       final Master master
       ) throws IOException, InterruptedException {
     
-    FutureTask<Void> initInputFuture = initInputAsync(input);
+    // Initialize input in-line since it sets parameters which may be used by the processor.
+    // Done only for SimpleInput.
+    input.initialize(job, master);
     FutureTask<Void> initOutputFuture = initOutputAsync(output);
     
     RecordReader in = new OldRecordReader(input);
@@ -151,12 +153,9 @@ public class MapProcessor extends MRTask implements Processor {
     MapRunnable runner =
         (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
-    // Wait for input/output to be initialized before starting processing.
-    LOG.info("Waiting on input initialization");
-    waitForInputInitialization(initInputFuture);
-
+    // Wait for output to be initialized before starting processing.
     LOG.info("Waiting on output initialization");
-    waitForInputInitialization(initOutputFuture);
+    waitForOutputInitialization(initOutputFuture);
 
     try {
       runner.run(in, collector, (Reporter)reporter);
@@ -179,11 +178,13 @@ public class MapProcessor extends MRTask implements Processor {
       Output out,
       final Master master
       ) throws IOException, InterruptedException {
-    // make a task context so we can get the classes
-    
-    FutureTask<Void> initInputFuture = initInputAsync(in);
+
+    // Initialize input in-line since it sets parameters which may be used by the processor.
+    // Done only for SimpleInput.
+    in.initialize(job, master);
     FutureTask<Void> initOutputFuture = initOutputAsync(out);
     
+    // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
         new TaskAttemptContextImpl(job, taskAttemptId, reporter);
 
@@ -206,12 +207,9 @@ public class MapProcessor extends MRTask implements Processor {
     org.apache.hadoop.mapreduce.RecordWriter output = 
         new NewOutputCollector(out);
 
-    // Wait for input/output to be initialized before starting processing.
-    LOG.info("Waiting on input initialization");
-    waitForInputInitialization(initInputFuture);
-
+    // Wait for output to be initialized before starting processing.
     LOG.info("Waiting on output initialization");
-    waitForInputInitialization(initOutputFuture);
+    waitForOutputInitialization(initOutputFuture);
 
     org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit();
     


Mime
View raw message