tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [1/2] git commit: TEZ-376. Parallelize I/O/P initialization in Map/ReduceProcessor. (sseth)
Date Tue, 20 Aug 2013 18:43:50 GMT
Updated Branches:
  refs/heads/master efed85db1 -> 34de31a89


TEZ-376. Parallelize I/O/P initialization in Map/ReduceProcessor.
(sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/45e1a34a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/45e1a34a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/45e1a34a

Branch: refs/heads/master
Commit: 45e1a34ac42c1e68b045dcf00932e21a9513b028
Parents: efed85d
Author: Siddharth Seth <sseth@apache.org>
Authored: Tue Aug 20 11:42:40 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Tue Aug 20 11:42:40 2013 -0700

----------------------------------------------------------------------
 .../apache/tez/mapreduce/input/SimpleInput.java | 23 +++++-
 .../apache/tez/mapreduce/processor/MRTask.java  | 78 ++++++++++++++++--
 .../mapreduce/processor/map/MapProcessor.java   | 64 +++++++--------
 .../processor/reduce/ReduceProcessor.java       | 85 +++++++++++++-------
 4 files changed, 179 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45e1a34a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
index fe80c37..6817151 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/SimpleInput.java
@@ -38,7 +38,9 @@ import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.counters.TaskCounter;
@@ -74,6 +76,8 @@ public class SimpleInput implements Input {
   org.apache.hadoop.mapred.InputFormat oldInputFormat;
   org.apache.hadoop.mapred.RecordReader oldRecordReader;
 
+  protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
+  
   Object key;
   Object value;
   
@@ -101,6 +105,14 @@ public class SimpleInput implements Input {
       jobConf = new JobConf(conf);
     }
     
+    // Read split information.
+    TaskSplitMetaInfo[] allMetaInfo = readSplits(jobConf);
+    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[task.getTaskAttemptId()
+        .getTaskID().getId()];
+    splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
+        thisTaskMetaInfo.getStartOffset());
+    
+    
     useNewApi = jobConf.getUseNewMapper();
     taskAttemptContext = task.getTaskAttemptContext();
     
@@ -118,7 +130,7 @@ public class SimpleInput implements Input {
         throw new IOException(cnfe);
       }
       
-      newInputSplit = getNewSplitDetails(task.getSplitIndex());
+      newInputSplit = getNewSplitDetails(splitMetaInfo);
       List<Statistics> matchedStats = null;
       if (newInputSplit instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
         matchedStats = MRTask.getFsStatistics(
@@ -131,7 +143,7 @@ public class SimpleInput implements Input {
     } else {
       oldInputFormat = jobConf.getInputFormat();
       org.apache.hadoop.mapred.InputSplit oldInputSplit =
-          getOldSplitDetails(task.getSplitIndex());
+          getOldSplitDetails(splitMetaInfo);
       
       List<Statistics> matchedStats = null;
       if (oldInputSplit instanceof FileSplit) {
@@ -365,4 +377,11 @@ public class SimpleInput implements Input {
     return newInputSplit;
   }
 
+  protected TaskSplitMetaInfo[] readSplits(Configuration conf)
+      throws IOException {
+    TaskSplitMetaInfo[] allTaskSplitMetaInfo;
+    allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(conf,
+        FileSystem.getLocal(conf));
+    return allTaskSplitMetaInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45e1a34a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index dcb5035..ba13c4a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -24,6 +24,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -46,7 +49,6 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -65,7 +67,9 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.records.ProceedToCompletionResponse;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.api.Input;
 import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.records.OutputContext;
 import org.apache.tez.mapreduce.combine.MRCombiner;
@@ -111,8 +115,6 @@ public abstract class MRTask extends RunningTaskContext {
 
   private MRTaskReporter mrReporter;
 
-  protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
-
   /**
    * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
    */
@@ -722,10 +724,6 @@ public abstract class MRTask extends RunningTaskContext {
     return reporter.getCounter(FileInputFormatCounter.BYTES_READ);
   }
 
-  public TaskSplitIndex getSplitIndex() {
-    return splitMetaInfo;
-  }
-
   public JobContext getJobContext() {
     return jobContext;
   }
@@ -737,4 +735,70 @@ public abstract class MRTask extends RunningTaskContext {
   public TezEngineTaskContext getTezEngineTaskContext() {
     return tezEngineTaskContext;
   }
+  
+  protected FutureTask<Void> initInputAsync(Input input) {
+    FutureTask<Void> initInputFuture = new FutureTask<Void>(
+        new InitInputCallable(input));
+    new Thread(initInputFuture, "InitInputThread").start();
+    return initInputFuture;
+  }
+
+  protected FutureTask<Void> initOutputAsync(Output output) {
+    FutureTask<Void> initOutputFuture = new FutureTask<Void>(
+        new InitOutputCallable(output));
+    new Thread(initOutputFuture, "InitOutputThread").start();
+    return initOutputFuture;
+  }
+
+  protected class InitInputCallable implements Callable<Void> {
+    Input input;
+    InitInputCallable(Input input) {
+      this.input = input;
+    }
+    @Override
+    public Void call() throws IOException, InterruptedException {
+      input.initialize(jobConf, getTaskReporter());
+      LOG.info("Input initialized");
+      return null;
+    }
+  }
+  
+  protected class InitOutputCallable implements Callable<Void> {
+    Output output;
+    InitOutputCallable(Output output) {
+      this.output = output;
+    }
+    @Override
+    public Void call() throws IOException, InterruptedException {
+      output.initialize(jobConf, getTaskReporter());
+      LOG.info("Output initialized");
+      return null;
+    }
+  }
+  
+  private void waitForIOInitialization(FutureTask<Void> future)
+      throws InterruptedException, IOException {
+    try {
+      future.get();
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof InterruptedException) {
+        throw (InterruptedException) e.getCause();
+      } else if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      } else {
+        throw new RuntimeException("UnknownException from I/O initialization",
+            e.getCause());
+      }
+    }
+  }
+
+  protected void waitForInputInitialization(FutureTask<Void> future)
+      throws InterruptedException, IOException {
+    waitForIOInitialization(future);
+  }
+  
+  protected void waitForOutputInitialization(FutureTask<Void> future)
+      throws InterruptedException, IOException {
+    waitForIOInitialization(future);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45e1a34a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 002f344..2b6789a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -18,11 +18,11 @@
 package org.apache.tez.mapreduce.processor.map;
 
 import java.io.IOException;
+import java.util.concurrent.FutureTask;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.MapRunnable;
@@ -31,9 +31,6 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezEngineTaskContext;
@@ -76,14 +73,7 @@ public class MapProcessor extends MRTask implements Processor {
       final Input[] ins,
       final Output[] outs)
           throws IOException, InterruptedException {
-    
-    // Read split information.
-    TaskSplitMetaInfo[] allMetaInfo = readSplits();
-    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezEngineTaskContext
-        .getTaskAttemptId().getTaskID().getId()];
-    splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
-        thisTaskMetaInfo.getStartOffset());
-    
+
     MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
     boolean useNewApi = jobConf.getUseNewMapper();
     initTask(jobConf, taskAttemptId.getTaskID().getVertexID().getDAGId(),
@@ -97,20 +87,19 @@ public class MapProcessor extends MRTask implements Processor {
     }
     Input in = ins[0];
     Output out = outs[0];
-
-    if (in instanceof SimpleInput) {
-      ((SimpleInput)in).setTask(this);
-    }
     
+    // Sanity check
+    if (!(in instanceof SimpleInput)) {
+      throw new IOException("Unknown input! - " + in.getClass());
+    }
+    SimpleInput input = (SimpleInput)in;
+    input.setTask(this);
+
     if (out instanceof SimpleOutput) {
       ((SimpleOutput)out).setTask(this);
     } else if (out instanceof SortingOutput) {
       ((SortingOutput)out).setTask(this);
     }
-    
-    
-    in.initialize(jobConf, getTaskReporter());
-    out.initialize(jobConf, getTaskReporter());
 
     // If there are no reducers then there won't be any sort. Hence the map 
     // phase will govern the entire attempt's progress.
@@ -122,12 +111,7 @@ public class MapProcessor extends MRTask implements Processor {
       mapPhase = getProgress().addPhase("map");
     }
 
-    // Sanity check
-    if (!(in instanceof SimpleInput)) {
-      throw new IOException("Unknown input! - " + in.getClass());
-    }
-    SimpleInput input = (SimpleInput)in;
-    
+
     if (useNewApi) {
       runNewMapper(jobConf, reporter, input, out, getTaskReporter());
     } else {
@@ -150,6 +134,9 @@ public class MapProcessor extends MRTask implements Processor {
       final Master master
       ) throws IOException, InterruptedException {
     
+    FutureTask<Void> initInputFuture = initInputAsync(input);
+    FutureTask<Void> initOutputFuture = initOutputAsync(output);
+    
     RecordReader in = new OldRecordReader(input);
         
     int numReduceTasks = tezEngineTaskContext.getOutputSpecList().get(0)
@@ -161,6 +148,13 @@ public class MapProcessor extends MRTask implements Processor {
     MapRunnable runner =
         (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
 
+    // Wait for input/output to be initialized before starting processing.
+    LOG.info("Waiting on input initialization");
+    waitForInputInitialization(initInputFuture);
+
+    LOG.info("Waiting on output initialization");
+    waitForInputInitialization(initOutputFuture);
+
     try {
       runner.run(in, collector, (Reporter)reporter);
       mapPhase.complete();
@@ -183,6 +177,10 @@ public class MapProcessor extends MRTask implements Processor {
       final Master master
       ) throws IOException, InterruptedException {
     // make a task context so we can get the classes
+    
+    FutureTask<Void> initInputFuture = initInputAsync(in);
+    FutureTask<Void> initOutputFuture = initOutputAsync(out);
+    
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
         new TaskAttemptContextImpl(job, taskAttemptId, reporter);
 
@@ -205,6 +203,13 @@ public class MapProcessor extends MRTask implements Processor {
     org.apache.hadoop.mapreduce.RecordWriter output = 
         new NewOutputCollector(out);
 
+    // Wait for input/output to be initialized before starting processing.
+    LOG.info("Waiting on input initialization");
+    waitForInputInitialization(initInputFuture);
+
+    LOG.info("Waiting on output initialization");
+    waitForInputInitialization(initOutputFuture);
+
     org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit();
     
     org.apache.hadoop.mapreduce.MapContext 
@@ -374,11 +379,4 @@ public class MapProcessor extends MRTask implements Processor {
     return reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
 
   }
-  
-  protected TaskSplitMetaInfo[] readSplits() throws IOException {
-    TaskSplitMetaInfo[] allTaskSplitMetaInfo;
-    allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(getConf(),
-        FileSystem.getLocal(getConf()));
-    return allTaskSplitMetaInfo;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/45e1a34a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 0fd404d..25db8ee 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -19,6 +19,7 @@ package org.apache.tez.mapreduce.processor.reduce;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.FutureTask;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -142,14 +143,6 @@ implements Processor {
       ((SortingOutput)out).setTask(this);
     }
 
-    in.initialize(jobConf, getTaskReporter());
-    out.initialize(jobConf, getTaskReporter());
-
-    sortPhase  = getProgress().addPhase("sort");
-    reducePhase = getProgress().addPhase("reduce");
-    sortPhase.complete();                         // sort is complete
-    setPhase(TezTaskStatus.Phase.REDUCE); 
-
     this.statusUpdate();
     
     Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
@@ -204,9 +197,12 @@ implements Processor {
       Class valueClass,
       final Output output) throws IOException, InterruptedException {
     
+    FutureTask<Void> initInputFuture = initInputAsync(input);
+    FutureTask<Void> initOutputFuture = initOutputAsync(output);
+
     Reducer reducer = 
         ReflectionUtils.newInstance(job.getReducerClass(), job);
-    
+
     // make output collector
 
     OutputCollector collector = 
@@ -221,6 +217,13 @@ implements Processor {
       }
     };
 
+    // Wait for input/output to be initialized before starting processing.
+    LOG.info("Waiting on input initialization");
+    waitForInputInitialization(initInputFuture);
+
+    LOG.info("Waiting on output initialization");
+    waitForOutputInitialization(initOutputFuture);
+
     // apply reduce function
     try {
       ReduceValuesIterator values = 
@@ -298,6 +301,26 @@ implements Processor {
       final Output out
       ) throws IOException,InterruptedException, 
       ClassNotFoundException {
+    
+    FutureTask<Void> initInputFuture = initInputAsync(input);
+    FutureTask<Void> initOutputFuture = initOutputAsync(out);
+    
+    // make a task context so we can get the classes
+    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+        new TaskAttemptContextImpl(job, taskAttemptId, reporter);
+    
+    // make a reducer
+    org.apache.hadoop.mapreduce.Reducer reducer =
+        (org.apache.hadoop.mapreduce.Reducer)
+        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
+
+    // Wait for input/output to be initialized before starting processing.
+    LOG.info("Waiting on input initialization");
+    waitForInputInitialization(initInputFuture);
+
+    LOG.info("Waiting on output initialization");
+    waitForOutputInitialization(initOutputFuture);
+
     // wrap value iterator to report progress.
     final TezRawKeyValueIterator rawIter = input.getIterator();
     TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() {
@@ -319,31 +342,22 @@ implements Processor {
         return ret;
       }
     };
-    
-    // make a task context so we can get the classes
-    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-        new TaskAttemptContextImpl(job, taskAttemptId, reporter);
-    
-    // make a reducer
-    org.apache.hadoop.mapreduce.Reducer reducer =
-        (org.apache.hadoop.mapreduce.Reducer)
-        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
 
     org.apache.hadoop.mapreduce.RecordWriter trackedRW = 
         new org.apache.hadoop.mapreduce.RecordWriter() {
 
-          @Override
-          public void write(Object key, Object value) throws IOException,
-              InterruptedException {
-            out.write(key, value);
-          }
+      @Override
+      public void write(Object key, Object value) throws IOException,
+      InterruptedException {
+        out.write(key, value);
+      }
 
-          @Override
-          public void close(TaskAttemptContext context) throws IOException,
-              InterruptedException {
-            out.close();
-          }
-        };
+      @Override
+      public void close(TaskAttemptContext context) throws IOException,
+      InterruptedException {
+        out.close();
+      }
+    };
 
     org.apache.hadoop.mapreduce.Reducer.Context reducerContext = 
         createReduceContext(
@@ -354,6 +368,9 @@ implements Processor {
             committer,
             reporter, comparator, keyClass,
             valueClass);
+    
+    
+    
     reducer.run(reducerContext);
     trackedRW.close(reducerContext);
   }
@@ -374,4 +391,14 @@ implements Processor {
   public TezCounter getInputRecordsCounter() {
     return reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
   }
+
+  @Override
+  protected void waitForInputInitialization(FutureTask<Void> future)
+      throws InterruptedException, IOException {
+    super.waitForInputInitialization(future);
+    sortPhase = getProgress().addPhase("sort");
+    reducePhase = getProgress().addPhase("reduce");
+    sortPhase.complete(); // sort is complete
+    setPhase(TezTaskStatus.Phase.REDUCE);
+  }
 }


Mime
View raw message