tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-599. Change MRInputLegacy to initialize the RecordReader when requested by a Processor. (sseth)
Date Wed, 06 Nov 2013 22:07:28 GMT
Updated Branches:
  refs/heads/master 98ca0091d -> 6efb9a4cf


TEZ-599. Change MRInputLegacy to initialize the RecordReader when
requested by a Processor. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/6efb9a4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/6efb9a4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/6efb9a4c

Branch: refs/heads/master
Commit: 6efb9a4cf8b5b3db48f6b9c15fb3b0dbbe05a371
Parents: 98ca009
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Nov 6 14:07:07 2013 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Nov 6 14:07:07 2013 -0800

----------------------------------------------------------------------
 .../org/apache/tez/dag/app/dag/impl/VertexImpl.java  |  2 +-
 .../java/org/apache/tez/mapreduce/input/MRInput.java | 15 ++++++++-------
 .../apache/tez/mapreduce/input/MRInputLegacy.java    |  6 ++++++
 3 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6efb9a4c/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 257984f..4d11c5d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1042,7 +1042,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           LOG.error("task failure accounting error.  terminationCause=TASK_FAILURE but vertex.failedTaskCount
== 0");
         }
         vertex.setFinishTime();
-        String diagnosticMsg = "Vertex killed as one or more tasks failed. "
+        String diagnosticMsg = "Vertex failed as one or more tasks failed. "
             + "failedTasks:"
             + vertex.failedTaskCount;
         LOG.info(diagnosticMsg);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6efb9a4c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 826ba23..c84d48c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -132,11 +132,17 @@ public class MRInput implements LogicalInput {
     this.fileInputByteCounter = inputContext.getCounters().findCounter(FileInputFormatCounter.BYTES_READ);
     
     useNewApi = this.jobConf.getUseNewMapper();
-    this.splitInfoViaEvents = conf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
+    this.splitInfoViaEvents = jobConf.getBoolean(MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
         MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS_DEFAULT);
     LOG.info("Using New mapreduce API: " + useNewApi
         + ", split information via event: " + splitInfoViaEvents);
 
+    initializeInternal();
+    return null;
+  }
+
+  @Private
+  void initializeInternal() throws IOException {
     // Primarily for visibility
     rrLock.lock();
     try {
@@ -146,10 +152,9 @@ public class MRInput implements LogicalInput {
         } else {
           setupOldInputFormat();
         }
-
       } else {
         // Read split information.
-        TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
+        TaskSplitMetaInfo[] allMetaInfo = readSplits(jobConf);
         TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext
             .getTaskIndex()];
         this.splitMetaInfo = new TaskSplitIndex(
@@ -168,13 +173,9 @@ public class MRInput implements LogicalInput {
     } finally {
       rrLock.unlock();
     }
-
     LOG.info("Initialzed MRInput: " + inputContext.getSourceVertexName());
-    return null;
   }
 
-  
-  
   private void setupOldInputFormat() {
     oldInputFormat = this.jobConf.getInputFormat();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/6efb9a4c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
index fe89607..5c2e515 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInputLegacy.java
@@ -38,6 +38,11 @@ public class MRInputLegacy extends MRInput {
   private volatile boolean inited = false;
   private ReentrantLock eventLock = new ReentrantLock();
   private Condition eventCondition = eventLock.newCondition();
+
+  @Private
+  protected void initializeInternal() throws IOException {
+    LOG.info("MRInputLegacy deferring initialization");
+  }
   
   @Private
   public org.apache.hadoop.mapreduce.InputSplit getNewInputSplit() {
@@ -52,6 +57,7 @@ public class MRInputLegacy extends MRInput {
   
   @LimitedPrivate("hive")
   public void init() throws IOException {
+    super.initializeInternal();
     checkAndAwaitRecordReaderInitialization();
   }
   


Mime
View raw message