tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: Allow only a single instance of SimpleInput.RecordReader to be created (part of TEZ-398). (sseth)
Date Mon, 16 Sep 2013 17:41:59 GMT
Updated Branches:
  refs/heads/TEZ-398 340426c97 -> ff6de762f


Allow only a single instance of SimpleInput.RecordReader to be created
(part of TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ff6de762
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ff6de762
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ff6de762

Branch: refs/heads/TEZ-398
Commit: ff6de762fc8ae9ef78f6f371fc7b449abb148e90
Parents: 340426c
Author: Siddharth Seth <sseth@apache.org>
Authored: Mon Sep 16 10:41:43 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Mon Sep 16 10:41:43 2013 -0700

----------------------------------------------------------------------
 .../tez/mapreduce/newinput/SimpleInput.java     | 134 +++++++++++--------
 .../mapreduce/newinput/SimpleInputLegacy.java   |   1 +
 2 files changed, 77 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ff6de762/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
index 8f11739..9969b81 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInput.java
@@ -54,6 +54,8 @@ import org.apache.tez.mapreduce.common.Utils;
 import org.apache.tez.mapreduce.hadoop.newmapred.MRReporter;
 import org.apache.tez.mapreduce.hadoop.newmapreduce.TaskAttemptContextImpl;
 
+import com.google.common.base.Preconditions;
+
 /**
  * {@link SimpleInput} is an {@link Input} which provides key/values pairs
  * for the consumer.
@@ -71,6 +73,7 @@ public class SimpleInput implements LogicalInput {
   
   private JobConf jobConf;
   private Configuration incrementalConf;
+  private boolean recordReaderCreated = false;
   
   boolean useNewApi;
   
@@ -98,7 +101,7 @@ public class SimpleInput implements LogicalInput {
     Configuration conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
     this.jobConf = new JobConf(conf);
     
- // Read split information.
+    // Read split information.
     TaskSplitMetaInfo[] allMetaInfo = readSplits(conf);
     TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[inputContext.getTaskIndex()];
     this.splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
@@ -163,63 +166,11 @@ public class SimpleInput implements LogicalInput {
 
   @Override
   public KVReader getReader() throws IOException {
-    return new KVReader() {
-      
-      Object key;
-      Object value;
-      
-      // Setup the values iterator once, and set value on the same object each time
-      // to prevent lots of objects being created.
-      private SimpleValueIterator valueIterator = new SimpleValueIterator();
-      private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
-
-
-      private final boolean localNewApi = useNewApi;
-      
-      @SuppressWarnings("unchecked")
-      @Override
-      public boolean next() throws IOException {
-        boolean hasNext = false;
-        long bytesInPrev = getInputBytes();
-        if (localNewApi) {
-          try {
-            hasNext = newRecordReader.nextKeyValue();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Interrupted while checking for next key-value", e);
-          }
-        } else {
-          hasNext = oldRecordReader.next(key, value);
-        }
-        long bytesInCurr = getInputBytes();
-        fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
-        
-        if (hasNext) {
-          inputRecordCounter.increment(1);
-        }
-        
-        return hasNext;
-      }
-
-      @Override
-      public KVRecord getCurrentKV() throws IOException {
-        KVRecord kvRecord = null;
-        if (localNewApi) {
-          try {
-            valueIterator.setValue(newRecordReader.getCurrentValue());
-            kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Interrupted while fetching next key-value", e);
-          }
-          
-        } else {
-          valueIterator.setValue(value);
-          kvRecord = new KVRecord(key, valueIterable);
-        }
-        return kvRecord;
-      }
-    };
+    Preconditions
+        .checkState(recordReaderCreated == false,
+            "Only a single instance of record reader can be created for this input.");
+    recordReaderCreated = true;
+    return new MRInputKVReader();
   }
 
 
@@ -416,4 +367,71 @@ public class SimpleInput implements LogicalInput {
         FileSystem.getLocal(conf));
     return allTaskSplitMetaInfo;
   }
+  
+  private class MRInputKVReader implements KVReader {
+    
+    Object key;
+    Object value;
+
+    private SimpleValueIterator valueIterator = new SimpleValueIterator();
+    private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
+
+    private final boolean localNewApi;
+    
+    MRInputKVReader() {
+      localNewApi = useNewApi;
+      if (!localNewApi) {
+        key = oldRecordReader.createKey();
+        value =oldRecordReader.createValue();
+      }
+    }
+    
+    // Setup the values iterator once, and set value on the same object each time
+    // to prevent lots of objects being created.
+
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public boolean next() throws IOException {
+      boolean hasNext = false;
+      long bytesInPrev = getInputBytes();
+      if (localNewApi) {
+        try {
+          hasNext = newRecordReader.nextKeyValue();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while checking for next key-value", e);
+        }
+      } else {
+        hasNext = oldRecordReader.next(key, value);
+      }
+      long bytesInCurr = getInputBytes();
+      fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
+      
+      if (hasNext) {
+        inputRecordCounter.increment(1);
+      }
+      
+      return hasNext;
+    }
+
+    @Override
+    public KVRecord getCurrentKV() throws IOException {
+      KVRecord kvRecord = null;
+      if (localNewApi) {
+        try {
+          valueIterator.setValue(newRecordReader.getCurrentValue());
+          kvRecord = new KVRecord(newRecordReader.getCurrentKey(), valueIterable);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while fetching next key-value", e);
+        }
+        
+      } else {
+        valueIterator.setValue(value);
+        kvRecord = new KVRecord(key, valueIterable);
+      }
+      return kvRecord;
+    }
+  };
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ff6de762/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInputLegacy.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInputLegacy.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInputLegacy.java
index 33cdaab..8f07a38 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInputLegacy.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newinput/SimpleInputLegacy.java
@@ -29,6 +29,7 @@ public class SimpleInputLegacy extends SimpleInput {
   }  
   
   @SuppressWarnings("rawtypes")
+  @Private
   public RecordReader getOldRecordReader() {
     return this.oldRecordReader;
   }


Mime
View raw message