tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter. (Contributed by Cyrille Chépélov)
Date Wed, 29 Apr 2015 23:13:06 GMT
Repository: tez
Updated Branches:
  refs/heads/master 6b6834e82 -> 1f6b474cb


TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter.
(Contributed by  Cyrille Chépélov)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/1f6b474c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/1f6b474c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/1f6b474c

Branch: refs/heads/master
Commit: 1f6b474cb70e0c0781a0a8951f872812fc264bbb
Parents: 6b6834e
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Thu Apr 30 04:42:50 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Thu Apr 30 04:42:50 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../writers/UnorderedPartitionedKVWriter.java   | 29 +++++++++++---------
 2 files changed, 17 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/1f6b474c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e7eb7b..940d257 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter
   TEZ-2084. Tez UI: Stacktrace format info is lost in diagnostics
   TEZ-2374. Fix build break against hadoop-2.2 due to TEZ-2325.
   TEZ-2314. Tez task attempt failures due to bad event serialization

http://git-wip-us.apache.org/repos/asf/tez/blob/1f6b474c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 7aac6f9..37d8be6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -250,7 +250,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     // Wrap to 4 byte (Int) boundary for metaData
     int mod = currentBuffer.nextPosition % INT_SIZE;
     int metaSkip = mod == 0 ? 0 : (INT_SIZE - mod);
-    if (currentBuffer.availableSize < (META_SIZE + metaSkip)) {
+    if ((currentBuffer.availableSize < (META_SIZE + metaSkip)) || (currentBuffer.full))
{
       // Move over to the next buffer.
       metaSkip = 0;
       setupNextBuffer();
@@ -259,9 +259,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     int metaStart = currentBuffer.nextPosition;
     currentBuffer.availableSize -= (META_SIZE + metaSkip);
     currentBuffer.nextPosition += META_SIZE;
-    try {
-      keySerializer.serialize(key);
-    } catch (BufferTooSmallException e) {
+    
+    keySerializer.serialize(key);
+
+    if (currentBuffer.full) {
       if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk.
         // Key too large for any buffer. Write entire record to disk.
         currentBuffer.reset();
@@ -275,10 +276,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         return;
       }
     }
+
+
     int valStart = currentBuffer.nextPosition;
-    try {
-      valSerializer.serialize(value);
-    } catch (BufferTooSmallException e) {
+    valSerializer.serialize(value);
+      
+    if (currentBuffer.full) {
       // Value too large for current buffer, or K-V too large for entire buffer.
       if (metaStart == 0) {
         // Key + Value too large for a single buffer.
@@ -824,8 +827,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
 
     public void write(byte[] b, int off, int len) throws IOException {
-      if (len > currentBuffer.availableSize) {
-        throw new BufferTooSmallException();
+      if (currentBuffer.full) {
+          /* no longer do anything until reset */
+      } else if (len > currentBuffer.availableSize) {
+        currentBuffer.full = true; /* stop working & signal we hit the end */
       } else {
         System.arraycopy(b, off, currentBuffer.buffer, currentBuffer.nextPosition, len);
         currentBuffer.nextPosition += len;
@@ -851,6 +856,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
     private int nextPosition = 0;
     private int availableSize;
+    private boolean full = false;
 
     WrappedBuffer(int numPartitions, int size) {
       this.partitionPositions = new int[numPartitions];
@@ -876,6 +882,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       nextPosition = 0;
       skipSize = 0;
       availableSize = size;
+      full = false;
     }
 
     void cleanup() {
@@ -884,10 +891,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
   }
 
-  private static class BufferTooSmallException extends IOException {
-    private static final long serialVersionUID = 1L;
-  }
-
   private void sendPipelinedEventForSpill(BitSet emptyPartitions, int spillNumber, boolean
isFinalUpdate) {
     if (!pipelinedShuffle) {
       return;


Mime
View raw message