tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rbalamo...@apache.org
Subject tez git commit: TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter. (Contributed by Cyrille Chépélov)
Date Wed, 29 Apr 2015 23:19:02 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.6 c5f57ccf6 -> 8fbd0ba84


TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter.
(Contributed by  Cyrille Chépélov)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8fbd0ba8
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8fbd0ba8
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8fbd0ba8

Branch: refs/heads/branch-0.6
Commit: 8fbd0ba842293713da27ca8fa39e2376398f9645
Parents: c5f57cc
Author: Rajesh Balamohan <rbalamohan@apache.org>
Authored: Thu Apr 30 04:48:40 2015 +0530
Committer: Rajesh Balamohan <rbalamohan@apache.org>
Committed: Thu Apr 30 04:48:40 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../writers/UnorderedPartitionedKVWriter.java   | 31 +++++++++++---------
 2 files changed, 18 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8fbd0ba8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9067c86..129f0f7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.6.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2256. Avoid use of BufferTooSmallException to signal end of buffer in UnorderedPartitionedKVWriter.
   TEZ-2385. branch-0.6 compile failure caused by TEZ-2226.
   TEZ-2380. Disable fall back to reading from timeline if timeline disabled.
   TEZ-2226. Disable writing history to timeline if domain creation fails.

http://git-wip-us.apache.org/repos/asf/tez/blob/8fbd0ba8/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 1ba00a0..1203a5e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -199,7 +199,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     // Wrap to 4 byte (Int) boundary for metaData
     int mod = currentBuffer.nextPosition % INT_SIZE;
     int metaSkip = mod == 0 ? 0 : (INT_SIZE - mod);
-    if (currentBuffer.availableSize < (META_SIZE + metaSkip)) {
+    if ((currentBuffer.availableSize < (META_SIZE + metaSkip)) || (currentBuffer.full))
{
       // Move over to the next buffer.
       metaSkip = 0;
       setupNextBuffer();
@@ -208,9 +208,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     int metaStart = currentBuffer.nextPosition;
     currentBuffer.availableSize -= (META_SIZE + metaSkip);
     currentBuffer.nextPosition += META_SIZE;
-    try {
-      keySerializer.serialize(key);
-    } catch (BufferTooSmallException e) {
+    
+    keySerializer.serialize(key);
+
+    if (currentBuffer.full) {
       if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk.
         // Key too large for any buffer. Write entire record to disk.
         currentBuffer.reset();
@@ -224,10 +225,12 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
         return;
       }
     }
+
+
     int valStart = currentBuffer.nextPosition;
-    try {
-      valSerializer.serialize(value);
-    } catch (BufferTooSmallException e) {
+    valSerializer.serialize(value);
+      
+    if (currentBuffer.full) {
       // Value too large for current buffer, or K-V too large for entire buffer.
       if (metaStart == 0) {
         // Key + Value too large for a single buffer.
@@ -647,8 +650,10 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
 
     public void write(byte[] b, int off, int len) throws IOException {
-      if (len > currentBuffer.availableSize) {
-        throw new BufferTooSmallException();
+      if (currentBuffer.full) {
+          /* no longer do anything until reset */
+      } else if (len > currentBuffer.availableSize) {
+        currentBuffer.full = true; /* stop working & signal we hit the end */
       } else {
         System.arraycopy(b, off, currentBuffer.buffer, currentBuffer.nextPosition, len);
         currentBuffer.nextPosition += len;
@@ -674,6 +679,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
 
     private int nextPosition = 0;
     private int availableSize;
+    private boolean full = false;
 
     WrappedBuffer(int numPartitions, int size) {
       this.partitionPositions = new int[numPartitions];
@@ -699,6 +705,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       nextPosition = 0;
       skipSize = 0;
       availableSize = size;
+      full = false;
     }
 
     void cleanup() {
@@ -707,10 +714,6 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
   }
 
-  private static class BufferTooSmallException extends IOException {
-    private static final long serialVersionUID = 1L;
-  }
-
   private class SpillCallback implements FutureCallback<SpillResult> {
 
     private final int spillNumber;
@@ -790,4 +793,4 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
     return shufflePort;
   }
-}
\ No newline at end of file
+}


Mime
View raw message