tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-560. Change MRInputLegacy to avoid RecordReader initialization in the handleEvent call. (sseth)
Date Tue, 15 Oct 2013 01:13:22 GMT
Updated Branches:
  refs/heads/master d154b49fc -> f30470bc7


TEZ-560. Change MRInputLegacy to avoid RecordReader initialization in the
handleEvent call. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/f30470bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/f30470bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/f30470bc

Branch: refs/heads/master
Commit: f30470bc7e5f6b3cfa293a524ddb2bbf595ac752
Parents: d154b49
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Oct 14 18:12:54 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Oct 14 18:12:54 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/mapreduce/input/MRInput.java | 170 ++++++++++++-------
 .../tez/mapreduce/input/MRInputLegacy.java      |  52 ++++++
 .../mapreduce/processor/map/MapProcessor.java   |   1 +
 3 files changed, 161 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f30470bc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 1de90a0..9e16287 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -25,6 +25,8 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -69,6 +71,8 @@ import com.google.common.base.Preconditions;
  *
  * It is compatible with all standard Apache Hadoop MapReduce 
  * {@link InputFormat} implementations.
+ * 
+ * This class is not meant to be extended by external projects.
  */
 
 public class MRInput implements LogicalInput {
@@ -79,11 +83,11 @@ public class MRInput implements LogicalInput {
   Condition rrInited = rrLock.newCondition();
   private TezInputContext inputContext;
   
-  private boolean eventReceived = false;
+  private volatile boolean eventReceived = false;
   
   private JobConf jobConf;
   private Configuration incrementalConf;
-  private boolean recordReaderCreated = false;
+  private boolean readerCreated = false;
   
   boolean useNewApi;
   
@@ -92,14 +96,14 @@ public class MRInput implements LogicalInput {
   @SuppressWarnings("rawtypes")
   private org.apache.hadoop.mapreduce.InputFormat newInputFormat;
   @SuppressWarnings("rawtypes")
-  private volatile org.apache.hadoop.mapreduce.RecordReader newRecordReader;
-  protected volatile org.apache.hadoop.mapreduce.InputSplit newInputSplit;
+  private org.apache.hadoop.mapreduce.RecordReader newRecordReader;
+  protected org.apache.hadoop.mapreduce.InputSplit newInputSplit;
   
   @SuppressWarnings("rawtypes")
   private InputFormat oldInputFormat;
   @SuppressWarnings("rawtypes")
-  protected volatile RecordReader oldRecordReader;
-  private volatile InputSplit oldInputSplit;
+  protected RecordReader oldRecordReader;
+  private InputSplit oldInputSplit;
 
   protected TaskSplitIndex splitMetaInfo = new TaskSplitIndex();
   
@@ -107,6 +111,10 @@ public class MRInput implements LogicalInput {
   private TezCounter fileInputByteCounter; 
   private List<Statistics> fsStats;
   
+  @Private
+  volatile boolean splitInfoViaEvents;
+  
+  
   @Override
   public List<Event> initialize(TezInputContext inputContext) throws IOException {
     this.inputContext = inputContext;
@@ -125,36 +133,41 @@ public class MRInput implements LogicalInput {
     this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
     
     useNewApi = this.jobConf.getUseNewMapper();
-    boolean viaEvents = conf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
+    this.splitInfoViaEvents = conf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
         MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
     LOG.info("Using New mapreduce API: " + useNewApi
-        + ", split information from events: " + viaEvents);
+        + ", split information via event: " + splitInfoViaEvents);
 
-    if (viaEvents) {
-      if (useNewApi) {
-        setupNewInputFormat();
-      } else {
-        setupOldInputFormat();
-      }
+    // Primarily for visibility
+    rrLock.lock();
+    try {
+      if (splitInfoViaEvents) {
+        if (useNewApi) {
+          setupNewInputFormat();
+        } else {
+          setupOldInputFormat();
+        }
 
-    } else {
-      // Read split information.
-      TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
-      TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext
-          .getTaskIndex()];
-      this.splitMetaInfo = new TaskSplitIndex(
-          thisTaskMetaInfo.getSplitLocation(),
-          thisTaskMetaInfo.getStartOffset());
-      if (useNewApi) {
-        setupNewInputFormat();
-        newInputSplit = getNewSplitDetailsFromDisk(splitMetaInfo);
-        setupNewRecordReader();
       } else {
-        setupOldInputFormat();
-         oldInputSplit = getOldSplitDetailsFromDisk(splitMetaInfo);
-         setupOldRecordReader();
+        // Read split information.
+        TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
+        TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext
+            .getTaskIndex()];
+        this.splitMetaInfo = new TaskSplitIndex(
+            thisTaskMetaInfo.getSplitLocation(),
+            thisTaskMetaInfo.getStartOffset());
+        if (useNewApi) {
+          setupNewInputFormat();
+          newInputSplit = getNewSplitDetailsFromDisk(splitMetaInfo);
+          setupNewRecordReader();
+        } else {
+          setupOldInputFormat();
+          oldInputSplit = getOldSplitDetailsFromDisk(splitMetaInfo);
+          setupOldRecordReader();
+        }
       }
-
+    } finally {
+      rrLock.unlock();
     }
 
     LOG.info("Initialzed MRInput: " + inputContext.getSourceVertexName());
@@ -216,23 +229,24 @@ public class MRInput implements LogicalInput {
   @Override
   public KeyValueReader getReader() throws IOException {
     Preconditions
-        .checkState(recordReaderCreated == false,
+        .checkState(readerCreated == false,
             "Only a single instance of record reader can be created for this input.");
-    recordReaderCreated = true;
-    if (newRecordReader == null && oldRecordReader == null) {
-      rrLock.lock();
-      try {
-        LOG.info("Awaiting RecordReader initialization");
+    readerCreated = true;
+    rrLock.lock();
+    try {
+      if (newRecordReader == null && oldRecordReader == null)
         try {
+          LOG.info("Awaiting RecordReader initialization");
           rrInited.await();
-        } catch (InterruptedException e) {
-          throw new IOException("Interrupted awaiting RecordReader setup", e);
+        } catch (Exception e) {
+          throw new IOException("Interrupted waiting for RecordReader initiailization");
         }
-      } finally {
-        rrLock.unlock();
-      }
+    } finally {
+      rrLock.unlock();
     }
-    LOG.info("Creating reader for MRInput: " + inputContext.getSourceVertexName());
+
+    LOG.info("Creating reader for MRInput: "
+        + inputContext.getSourceVertexName());
     return new MRInputKVReader();
   }
 
@@ -245,28 +259,15 @@ public class MRInput implements LogicalInput {
               + eventReceived);
     }
     Event event = inputEvents.iterator().next();
-    MRSplitProto splitProto = MRSplitProto
-        .parseFrom(((RootInputDataInformationEvent) event).getUserPayload());
-    if (useNewApi) {
-      newInputSplit = getNewSplitDetailsFromEvent(splitProto);
-      LOG.info("Split Details -> SplitClass: "
-          + newInputSplit.getClass().getName() + ", NewSplit: " + newInputSplit);
-      setupNewRecordReader();
-    } else {
-      oldInputSplit = getOldSplitDetailsFromEvent(splitProto);
-      LOG.info("Split Details -> SplitClass: "
-          + oldInputSplit.getClass().getName() + ", OldSplit: " + oldInputSplit);
-      setupOldRecordReader();
-    }
-    rrLock.lock();
-    try {
-      LOG.info("Notifying on inited to unblock reader");
-      rrInited.signal();
-    } finally {
-      rrLock.unlock();
-    }
+    Preconditions.checkArgument(event instanceof RootInputDataInformationEvent,
+        getClass().getSimpleName()
+            + " can only handle a single event of type: "
+            + RootInputDataInformationEvent.class.getSimpleName());
+
+    processSplitEvent((RootInputDataInformationEvent)event);
   }
 
+  
 
   @Override
   public void setNumPhysicalInputs(int numInputs) {
@@ -300,6 +301,8 @@ public class MRInput implements LogicalInput {
     }
     return null;
   }
+  
+  
 
   public float getProgress() throws IOException, InterruptedException {
     if (useNewApi) {
@@ -313,6 +316,49 @@ public class MRInput implements LogicalInput {
   private TaskAttemptContext createTaskAttemptContext() {
     return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
   }
+  
+  void processSplitEvent(RootInputDataInformationEvent event)
+      throws IOException {
+    rrLock.lock();
+    try {
+      initFromEventInternal(event);
+      LOG.info("Notifying on RecordReader Initialized");
+      rrInited.signal();
+    } finally {
+      rrLock.unlock();
+    }
+  }
+
+  @Private
+  void initFromEvent(RootInputDataInformationEvent initEvent)
+      throws IOException {
+    rrLock.lock();
+    try {
+      initFromEventInternal(initEvent);
+    } finally {
+      rrLock.unlock();
+    }
+  }
+  
+  private void initFromEventInternal(RootInputDataInformationEvent initEvent)
+      throws IOException {
+    LOG.info("Initializing RecordReader from event");
+    Preconditions.checkState(initEvent != null, "InitEvent must be specified");
+    MRSplitProto splitProto = MRSplitProto
+        .parseFrom(initEvent.getUserPayload());
+    if (useNewApi) {
+      newInputSplit = getNewSplitDetailsFromEvent(splitProto);
+      LOG.info("Split Details -> SplitClass: "
+          + newInputSplit.getClass().getName() + ", NewSplit: " + newInputSplit);
+      setupNewRecordReader();
+    } else {
+      oldInputSplit = getOldSplitDetailsFromEvent(splitProto);
+      LOG.info("Split Details -> SplitClass: "
+          + oldInputSplit.getClass().getName() + ", OldSplit: " + oldInputSplit);
+      setupOldRecordReader();
+    }
+    LOG.info("Initialized RecordReader from event");
+  }
 
   private InputSplit getOldSplitDetailsFromEvent(MRSplitProto splitProto)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f30470bc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index 5923746..24d1cb8 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -18,11 +18,27 @@
 
 package org.apache.tez.mapreduce.input;
 
+import java.io.IOException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
 
+@LimitedPrivate("Hive")
 public class MRInputLegacy extends MRInput {
 
+  private static final Log LOG = LogFactory.getLog(MRInputLegacy.class);
+  
+  private RootInputDataInformationEvent initEvent;
+  private volatile boolean inited = false;
+  private ReentrantLock eventLock = new ReentrantLock();
+  private Condition eventCondition = eventLock.newCondition();
+  
   @Private
   public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
     return this.newInputSplit;
@@ -33,4 +49,40 @@ public class MRInputLegacy extends MRInput {
   public RecordReader getOldRecordReader() {
     return this.oldRecordReader;
   }
+  
+  @LimitedPrivate("hive")
+  public void init() throws IOException {
+    eventLock.lock();
+    try {
+      if (splitInfoViaEvents && !inited) {
+        if (initEvent == null) {
+          LOG.info("Awaiting init event before initializing record reader");
+          try {
+            eventCondition.await();
+          } catch (InterruptedException e) {
+            throw new IOException("Interrupted while awaiting init event", e);
+          }
+        }
+        initFromEvent(initEvent);
+        inited = true;
+      } else {
+        // Already inited
+        return;
+      }
+    } finally {
+      eventLock.unlock();
+    }
+  }
+
+  @Override
+  void processSplitEvent(RootInputDataInformationEvent event) {
+    eventLock.lock();
+    try {
+      initEvent = event;
+      // Don't process event, but signal in case init is waiting on the event.
+      eventCondition.signal();
+    } finally {
+      eventLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/f30470bc/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 890f1fa..801cfcc 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -106,6 +106,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor
{
           "Only Simple Input supported. Input: " + in.getClass()));
     }
     MRInputLegacy input = (MRInputLegacy)in;
+    input.init();
     Configuration incrementalConf = input.getConfigUpdates();
     if (incrementalConf != null) {
       for (Entry<String, String> entry : incrementalConf) {


Mime
View raw message