tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject [2/2] git commit: TEZ-508. Add interfaces for KeyValueReader and KeyValuesReader. (sseth)
Date Fri, 27 Sep 2013 19:12:11 GMT
TEZ-508. Add interfaces for KeyValueReader and KeyValuesReader. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/034ca0a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/034ca0a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/034ca0a9

Branch: refs/heads/master
Commit: 034ca0a95175eebaec4d2a470027ed85b2051836
Parents: 9fbc8d3
Author: Siddharth Seth <sseth@apache.org>
Authored: Fri Sep 27 12:11:51 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Fri Sep 27 12:11:51 2013 -0700

----------------------------------------------------------------------
 .../processor/FilterByWordInputProcessor.java   | 10 ++--
 .../processor/FilterByWordOutputProcessor.java  | 10 ++--
 .../org/apache/tez/mapreduce/input/MRInput.java | 62 +++-----------------
 .../apache/tez/mapreduce/output/MROutput.java   |  6 +-
 .../mapreduce/processor/map/MapProcessor.java   | 22 +++----
 .../processor/reduce/ReduceProcessor.java       | 18 +++---
 .../tez/runtime/library/api/KVReader.java       | 59 -------------------
 .../tez/runtime/library/api/KVWriter.java       | 40 -------------
 .../tez/runtime/library/api/KeyValueReader.java | 60 +++++++++++++++++++
 .../tez/runtime/library/api/KeyValueWriter.java | 40 +++++++++++++
 .../runtime/library/api/KeyValuesReader.java    | 59 +++++++++++++++++++
 .../broadcast/input/BroadcastKVReader.java      | 60 +++----------------
 .../broadcast/output/FileBasedKVWriter.java     |  4 +-
 .../library/input/ShuffledMergedInput.java      |  6 +-
 .../library/input/ShuffledUnorderedKVInput.java |  4 +-
 .../library/output/InMemorySortedOutput.java    |  4 +-
 .../library/output/OnFileSortedOutput.java      |  6 +-
 .../library/output/OnFileUnorderedKVOutput.java |  4 +-
 .../output/TestOnFileUnorderedKVOutput.java     |  4 +-
 19 files changed, 224 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index c9517aa..8e4dea9 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -36,8 +36,8 @@ import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.library.api.KVReader;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
 
 public class FilterByWordInputProcessor implements LogicalIOProcessor {
@@ -107,12 +107,12 @@ public class FilterByWordInputProcessor implements LogicalIOProcessor
{
       }
     }
 
-    KVReader kvReader = mrInput.getReader();
-    KVWriter kvWriter = kvOutput.getWriter();
+    KeyValueReader kvReader = mrInput.getReader();
+    KeyValueWriter kvWriter = kvOutput.getWriter();
 
     while (kvReader.next()) {
       Object key = kvReader.getCurrentKey();
-      Object val = kvReader.getCurrentValues().iterator().next();
+      Object val = kvReader.getCurrentValue();
 
       Text valText = (Text) val;
       String readVal = valText.toString();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
index ac1101d..e56e2a7 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordOutputProcessor.java
@@ -30,8 +30,8 @@ import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.library.api.KVReader;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 
 
@@ -83,11 +83,11 @@ public class FilterByWordOutputProcessor implements LogicalIOProcessor
{
     ShuffledUnorderedKVInput kvInput = (ShuffledUnorderedKVInput) li;
     MROutput mrOutput = (MROutput) lo;
 
-    KVReader kvReader = kvInput.getReader();
-    KVWriter kvWriter = mrOutput.getWriter();
+    KeyValueReader kvReader = kvInput.getReader();
+    KeyValueWriter kvWriter = mrOutput.getWriter();
     while (kvReader.next()) {
       Object key = kvReader.getCurrentKey();
-      Object value = kvReader.getCurrentValues().iterator().next();
+      Object value = kvReader.getCurrentValue();
 
       kvWriter.write(key, value);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 0b4ae7f..ef0de5c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -18,7 +18,6 @@
 package org.apache.tez.mapreduce.input;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -39,9 +38,9 @@ import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
@@ -53,7 +52,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
 
 import com.google.common.base.Preconditions;
 
@@ -167,7 +166,7 @@ public class MRInput implements LogicalInput {
   }
 
   @Override
-  public KVReader getReader() throws IOException {
+  public KeyValueReader getReader() throws IOException {
     Preconditions
         .checkState(recordReaderCreated == false,
             "Only a single instance of record reader can be created for this input.");
@@ -227,46 +226,7 @@ public class MRInput implements LogicalInput {
   private TaskAttemptContext createTaskAttemptContext() {
     return new TaskAttemptContextImpl(this.jobConf, inputContext, true);
   }
-  
-
-  private static class SimpleValueIterator implements Iterator<Object> {
-
-    private Object value;
-
-    public void setValue(Object value) {
-      this.value = value;
-    }
-
-    public boolean hasNext() {
-      return value != null;
-    }
-
-    public Object next() {
-      Object value = this.value;
-      this.value = null;
-      return value;
-    }
-
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private static class SimpleIterable implements Iterable<Object> {
-    private final Iterator<Object> iterator;
-    public SimpleIterable(Iterator<Object> iterator) {
-      this.iterator = iterator;
-    }
-
-    @Override
-    public Iterator<Object> iterator() {
-      return iterator;
-    }
-  }
 
-
-
-  
   @SuppressWarnings("unchecked")
   private InputSplit getOldSplitDetails(TaskSplitIndex splitMetaInfo)
       throws IOException {
@@ -373,14 +333,11 @@ public class MRInput implements LogicalInput {
     return allTaskSplitMetaInfo;
   }
   
-  private class MRInputKVReader implements KVReader {
+  private class MRInputKVReader implements KeyValueReader {
     
     Object key;
     Object value;
 
-    private SimpleValueIterator valueIterator = new SimpleValueIterator();
-    private SimpleIterable valueIterable = new SimpleIterable(valueIterator);
-
     private final boolean localNewApi;
     
     MRInputKVReader() {
@@ -432,19 +389,18 @@ public class MRInput implements LogicalInput {
         return key;
       }
     }
-    
+
     @Override
-    public Iterable<Object> getCurrentValues() throws IOException {
+    public Object getCurrentValue() throws IOException {
       if (localNewApi) {
         try {
-          valueIterator.setValue(newRecordReader.getCurrentValue());
+          return newRecordReader.getCurrentValue();
         } catch (InterruptedException e) {
-          throw new IOException("Interrupted while fetching next value(s)", e);
+          throw new IOException("Interrupted while fetching next value", e);
         }
       } else {
-        valueIterator.setValue(value);
+        return value;
       }
-      return valueIterable;
     }
   };
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 11184e4..f250a8c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -35,7 +35,7 @@ import org.apache.tez.mapreduce.processor.MRTaskReporter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezOutputContext;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
 
 public class MROutput implements LogicalOutput {
 
@@ -235,8 +235,8 @@ public class MROutput implements LogicalOutput {
   }
 
   @Override
-  public KVWriter getWriter() throws IOException {
-    return new KVWriter() {
+  public KeyValueWriter getWriter() throws IOException {
+    return new KeyValueWriter() {
       private final boolean useNewWriter = useNewApi;
 
       @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index 54f6da9..890f1fa 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -48,8 +48,8 @@ import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.library.api.KVReader;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
@@ -113,7 +113,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor
{
       }
     }
 
-    KVWriter kvWriter = null;
+    KeyValueWriter kvWriter = null;
     if (!(out instanceof OnFileSortedOutput)) {
       kvWriter = ((MROutput)out).getWriter();
     } else {
@@ -133,7 +133,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor
{
       final JobConf job,
       final MRTaskReporter reporter,
       final MRInputLegacy input,
-      final KVWriter output
+      final KeyValueWriter output
       ) throws IOException, InterruptedException {
 
     // Initialize input in-line since it sets parameters which may be used by the processor.
@@ -156,7 +156,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor
{
   private void runNewMapper(final JobConf job,
       MRTaskReporter reporter,
       final MRInputLegacy in,
-      KVWriter out
+      KeyValueWriter out
       ) throws IOException, InterruptedException {
 
     // Initialize input in-line since it sets parameters which may be used by the processor.
@@ -206,7 +206,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor
{
   private static class NewRecordReader extends
       org.apache.hadoop.mapreduce.RecordReader {
     private final MRInput in;
-    private KVReader reader;
+    private KeyValueReader reader;
 
     private NewRecordReader(MRInput in) throws IOException {
       this.in = in;
@@ -235,7 +235,7 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor
{
     @Override
     public Object getCurrentValue() throws IOException,
         InterruptedException {
-      return reader.getCurrentValues().iterator().next();
+      return reader.getCurrentValue();
     }
 
     @Override
@@ -299,9 +299,9 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor
{
 
   private static class OldOutputCollector
   implements OutputCollector {
-    private final KVWriter output;
+    private final KeyValueWriter output;
 
-    OldOutputCollector(KVWriter output) {
+    OldOutputCollector(KeyValueWriter output) {
       this.output = output;
     }
 
@@ -312,9 +312,9 @@ public class MapProcessor extends MRTask implements LogicalIOProcessor
{
 
   private class NewOutputCollector
     extends org.apache.hadoop.mapreduce.RecordWriter {
-    private final KVWriter out;
+    private final KeyValueWriter out;
 
-    NewOutputCollector(KVWriter out) throws IOException {
+    NewOutputCollector(KeyValueWriter out) throws IOException {
       this.out = out;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 6c95ba8..0e41b0b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -45,8 +45,8 @@ import org.apache.tez.runtime.api.LogicalIOProcessor;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
-import org.apache.tez.runtime.library.api.KVReader;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
@@ -130,9 +130,9 @@ implements LogicalIOProcessor {
       throw new IOException("Illegal input to reduce: " + in.getClass());
     }
     ShuffledMergedInputLegacy shuffleInput = (ShuffledMergedInputLegacy)in;
-    KVReader kvReader = shuffleInput.getReader();
+    KeyValuesReader kvReader = shuffleInput.getReader();
 
-    KVWriter kvWriter = null;
+    KeyValueWriter kvWriter = null;
     if((out instanceof MROutput)) {
       kvWriter = ((MROutput) out).getWriter();
     } else if ((out instanceof OnFileSortedOutput)) {
@@ -162,11 +162,11 @@ implements LogicalIOProcessor {
 
   void runOldReducer(JobConf job,
       final MRTaskReporter reporter,
-      KVReader input,
+      KeyValuesReader input,
       RawComparator comparator,
       Class keyClass,
       Class valueClass,
-      final KVWriter output) throws IOException, InterruptedException {
+      final KeyValueWriter output) throws IOException, InterruptedException {
 
     Reducer reducer =
         ReflectionUtils.newInstance(job.getReducerClass(), job);
@@ -210,12 +210,12 @@ implements LogicalIOProcessor {
   private static class ReduceValuesIterator<KEY,VALUE>
   implements Iterator<VALUE> {
     private Counter reduceInputValueCounter;
-    private KVReader in;
+    private KeyValuesReader in;
     private Progressable reporter;
     private Object currentKey;
     private Iterator<Object> currentValues;
 
-    public ReduceValuesIterator (KVReader in,
+    public ReduceValuesIterator (KeyValuesReader in,
         Progressable reporter,
         Counter reduceInputValueCounter)
             throws IOException {
@@ -268,7 +268,7 @@ implements LogicalIOProcessor {
       RawComparator comparator,
       Class keyClass,
       Class valueClass,
-      final KVWriter out
+      final KeyValueWriter out
       ) throws IOException,InterruptedException,
       ClassNotFoundException {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
deleted file mode 100644
index fb14e9a..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVReader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.api;
-
-import java.io.IOException;
-
-import org.apache.tez.runtime.api.Reader;
-
-/**
- * A key/value(s) pair based {@link Reader}.
- * 
- * Example usage
- * <code>
- * while (kvReader.next()) {
- *   Object key =  kvReader.getKey();
- *   Iterable<Object> values = kvReader.getValues();
- * </code>
- *
- */
-public interface KVReader extends Reader {
-
-  /**
-   * Moves to the next key/values(s) pair
-   * 
-   * @return true if another key/value(s) pair exists, false if there are no more.
-   * @throws IOException
-   *           if an error occurs
-   */
-  public boolean next() throws IOException;
-
-  
-  /**
-   * Returns the current key
-   * @return
-   */
-  public Object getCurrentKey() throws IOException;
-  
-  /**
-   * Returns an Iterable view of the values associated with the current key
-   * @return
-   */
-  public Iterable<Object> getCurrentValues() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
deleted file mode 100644
index ff952ed..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KVWriter.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.api;
-
-import java.io.IOException;
-
-import org.apache.tez.runtime.api.Writer;
-
-/**
- * A key/value(s) pair based {@link Writer}
- */
-public interface KVWriter extends Writer {
-  /**
-   * Writes a key/value pair.
-   * 
-   * @param key
-   *          the key to write
-   * @param value
-   *          the value to write
-   * @throws IOException
-   *           if an error occurs
-   */
-  public void write(Object key, Object value) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
new file mode 100644
index 0000000..ad327b4
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.api.Reader;
+
+/**
+ * A key/value(s) pair based {@link Reader}.
+ * 
+ * Example usage
+ * <code>
+ * while (kvReader.next()) {
+ *   Object key =  kvReader.getCurrentKey();
+ *   Object value = kvReader.getCurrentValue();
+ * </code>
+ *
+ */
+public interface KeyValueReader extends Reader {
+
+  /**
+   * Moves to the next key/values(s) pair
+   * 
+   * @return true if another key/value(s) pair exists, false if there are no more.
+   * @throws IOException
+   *           if an error occurs
+   */
+  public boolean next() throws IOException;
+
+  
+  /**
+   * Returns the current key
+   * @return
+   */
+  public Object getCurrentKey() throws IOException;
+  
+  
+  /**
+   * @return the current value
+   * @throws IOException
+   */
+  public Object getCurrentValue() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
new file mode 100644
index 0000000..235f361
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.api.Writer;
+
+/**
+ * A key/value(s) pair based {@link Writer}
+ */
+public interface KeyValueWriter extends Writer {
+  /**
+   * Writes a key/value pair.
+   * 
+   * @param key
+   *          the key to write
+   * @param value
+   *          the value to write
+   * @throws IOException
+   *           if an error occurs
+   */
+  public void write(Object key, Object value) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
new file mode 100644
index 0000000..f300e2a
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.tez.runtime.api.Reader;
+
+/**
+ * A key/value(s) pair based {@link Reader}.
+ * 
+ * Example usage
+ * <code>
+ * while (kvReader.next()) {
+ *   Object key =  kvReader.getCurrentKey();
+ *   Iterable<Object> values = kvReader.getCurrentValues();
+ * </code>
+ *
+ */
+public interface KeyValuesReader extends Reader {
+
+  /**
+   * Moves to the next key/values(s) pair
+   * 
+   * @return true if another key/value(s) pair exists, false if there are no more.
+   * @throws IOException
+   *           if an error occurs
+   */
+  public boolean next() throws IOException;
+
+  
+  /**
+   * Returns the current key
+   * @return
+   */
+  public Object getCurrentKey() throws IOException;
+  
+  /**
+   * Returns an Iterable view of the values associated with the current key
+   * @return
+   */
+  public Iterable<Object> getCurrentValues() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
index 51c0d5e..070f902 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/input/BroadcastKVReader.java
@@ -19,7 +19,6 @@
 package org.apache.tez.runtime.library.broadcast.input;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -30,15 +29,15 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
-import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
 import org.apache.tez.runtime.library.shuffle.common.FetchedInput.Type;
+import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
 
-public class BroadcastKVReader<K, V> implements KVReader {
+public class BroadcastKVReader<K, V> implements KeyValueReader {
 
   private static final Log LOG = LogFactory.getLog(BroadcastKVReader.class);
   
@@ -53,9 +52,6 @@ public class BroadcastKVReader<K, V> implements KVReader {
   private final DataInputBuffer keyIn;
   private final DataInputBuffer valIn;
 
-  private final SimpleValueIterator valueIterator;
-  private final SimpleIterable valueIterable;
-  
   private K key;
   private V value;
   
@@ -89,9 +85,6 @@ public class BroadcastKVReader<K, V> implements KVReader {
     this.keyDeserializer.open(keyIn);
     this.valDeserializer = serializationFactory.getDeserializer(valClass);
     this.valDeserializer.open(valIn);
-    
-    this.valueIterator = new SimpleValueIterator();
-    this.valueIterable = new SimpleIterable(this.valueIterator);
   }
 
   // TODO NEWTEZ Maybe add an interface to check whether next will block.
@@ -127,11 +120,10 @@ public class BroadcastKVReader<K, V> implements KVReader {
   public Object getCurrentKey() throws IOException {
     return (Object) key;
   }
-  
-  @SuppressWarnings("unchecked")
-  public Iterable<Object> getCurrentValues() throws IOException {
-    this.valueIterator.setValue(value);
-    return (Iterable<Object>) this.valueIterable;
+
+  @Override
+  public Object getCurrentValue() throws IOException {
+    return value;
   }
 
   /**
@@ -194,42 +186,4 @@ public class BroadcastKVReader<K, V> implements KVReader {
           fetchedInput.getSize(), codec, null);
     }
   }
-
-  
-  
-  // TODO NEWTEZ Move this into a common class. Also used in MRInput
-  private class SimpleValueIterator implements Iterator<V> {
-
-    private V value;
-
-    public void setValue(V value) {
-      this.value = value;
-    }
-
-    public boolean hasNext() {
-      return value != null;
-    }
-
-    public V next() {
-      V value = this.value;
-      this.value = null;
-      return value;
-    }
-
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  private class SimpleIterable implements Iterable<V> {
-    private final Iterator<V> iterator;
-    public SimpleIterable(Iterator<V> iterator) {
-      this.iterator = iterator;
-    }
-
-    @Override
-    public Iterator<V> iterator() {
-      return iterator;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
index 7d33e63..9941de0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/broadcast/output/FileBasedKVWriter.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.api.TezOutputContext;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.TezRuntimeUtils;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
@@ -40,7 +40,7 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
 import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
 import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
 
-public class FileBasedKVWriter implements KVWriter {
+public class FileBasedKVWriter implements KeyValueWriter {
 
   private static final Log LOG = LogFactory.getLog(FileBasedKVWriter.class);
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
index 527f8e1..da152b8 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java
@@ -32,7 +32,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.common.ValuesIterator;
 import org.apache.tez.runtime.library.common.shuffle.impl.Shuffle;
@@ -121,7 +121,7 @@ public class ShuffledMergedInput implements LogicalInput {
    * @return a KVReader over the sorted input.
    */
   @Override
-  public KVReader getReader() throws IOException {
+  public KeyValuesReader getReader() throws IOException {
     if (rawIter == null) {
       try {
         waitForInputReady();
@@ -130,7 +130,7 @@ public class ShuffledMergedInput implements LogicalInput {
         throw new IOException("Interrupted while waiting for input ready", e);
       }
     }
-    return new KVReader() {
+    return new KeyValuesReader() {
 
       @Override
       public boolean next() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
index 40eff70..b6e17b6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java
@@ -28,7 +28,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.TezInputContext;
-import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.broadcast.input.BroadcastKVReader;
 import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
 
@@ -63,7 +63,7 @@ public class ShuffledUnorderedKVInput implements LogicalInput {
   }
 
   @Override
-  public KVReader getReader() throws Exception {
+  public KeyValueReader getReader() throws Exception {
     return this.kvReader;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
index 2ec6b2a..2a2872c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/InMemorySortedOutput.java
@@ -27,7 +27,7 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.Output;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.Writer;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.common.sort.impl.dflt.InMemoryShuffleSorter;
 
 /**
@@ -52,7 +52,7 @@ public class InMemorySortedOutput implements LogicalOutput {
 
   @Override
   public Writer getWriter() throws IOException {
-    return new KVWriter() {
+    return new KeyValueWriter() {
       
       @Override
       public void write(Object key, Object value) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
index 42e1eeb..5415053 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java
@@ -30,7 +30,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
 import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
@@ -68,8 +68,8 @@ public class OnFileSortedOutput implements LogicalOutput {
   }
 
   @Override
-  public KVWriter getWriter() throws IOException {
-    return new KVWriter() {
+  public KeyValueWriter getWriter() throws IOException {
+    return new KeyValueWriter() {
       @Override
       public void write(Object key, Object value) throws IOException {
         sorter.write(key, value);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
index 93c00d3..db4085c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java
@@ -29,7 +29,7 @@ import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.broadcast.output.FileBasedKVWriter;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
@@ -55,7 +55,7 @@ public class OnFileUnorderedKVOutput implements LogicalOutput {
   }
 
   @Override
-  public KVWriter getWriter() throws Exception {
+  public KeyValueWriter getWriter() throws Exception {
     return kvWriter;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/034ca0a9/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
index ff9afbd..c7626fd 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java
@@ -48,7 +48,7 @@ import org.apache.tez.runtime.api.TezOutputContext;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
 import org.apache.tez.runtime.api.impl.TezUmbilical;
-import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
 import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
 import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.runtime.library.testutils.KVDataGen;
@@ -125,7 +125,7 @@ public class TestOnFileUnorderedKVOutput {
     events = kvOutput.initialize(outputContext);
     assertTrue(events != null && events.size() == 0);
 
-    KVWriter kvWriter = kvOutput.getWriter();
+    KeyValueWriter kvWriter = kvOutput.getWriter();
     List<KVPair> data = KVDataGen.generateTestData(true);
     for (KVPair kvp : data) {
       kvWriter.write(kvp.getKey(), kvp.getvalue());


Mime
View raw message