hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject hadoop git commit: HADOOP-15206. BZip2 drops and duplicates records when input split size is small. Contributed by Aki Tanaka
Date Fri, 16 Feb 2018 21:08:29 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 904a6bf26 -> b1d5e73c1


HADOOP-15206. BZip2 drops and duplicates records when input split size is small. Contributed
by Aki Tanaka

(cherry picked from commit 0898ff42e9e5c53f2fce7ccdeb4e1cd7d0f123b3)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b1d5e73c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b1d5e73c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b1d5e73c

Branch: refs/heads/branch-2
Commit: b1d5e73c1ffb87323c4deddafb1f96e92bfb9093
Parents: 904a6bf
Author: Jason Lowe <jlowe@apache.org>
Authored: Fri Feb 16 14:49:00 2018 -0600
Committer: Jason Lowe <jlowe@apache.org>
Committed: Fri Feb 16 15:00:10 2018 -0600

----------------------------------------------------------------------
 .../apache/hadoop/io/compress/BZip2Codec.java   | 30 +++++++++++++++++++-
 .../hadoop/mapred/TestTextInputFormat.java      |  8 ++++++
 2 files changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1d5e73c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
index db78118..3c78cfc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/BZip2Codec.java
@@ -362,9 +362,29 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec
{
       bufferedIn = new BufferedInputStream(super.in);
       this.startingPos = super.getPos();
       this.readMode = readMode;
+      long numSkipped = 0;
       if (this.startingPos == 0) {
         // We only strip header if it is start of file
         bufferedIn = readStreamHeader();
+      } else if (this.readMode == READ_MODE.BYBLOCK  &&
+          this.startingPos <= HEADER_LEN + SUB_HEADER_LEN) {
+        // When we're in BYBLOCK mode and the start position is >=0
+        // and < HEADER_LEN + SUB_HEADER_LEN, we should skip to after
+        // start of the first bz2 block to avoid duplicated records
+        numSkipped = HEADER_LEN + SUB_HEADER_LEN + 1 - this.startingPos;
+        long skipBytes = numSkipped;
+        while (skipBytes > 0) {
+          long s = bufferedIn.skip(skipBytes);
+          if (s > 0) {
+            skipBytes -= s;
+          } else {
+            if (bufferedIn.read() == -1) {
+              break; // end of the split
+            } else {
+              skipBytes--;
+            }
+          }
+        }
       }
       input = new CBZip2InputStream(bufferedIn, readMode);
       if (this.isHeaderStripped) {
@@ -375,7 +395,15 @@ public class BZip2Codec implements Configurable, SplittableCompressionCodec
{
         input.updateReportedByteCount(SUB_HEADER_LEN);
       }
 
-      this.updatePos(false);
+      if (numSkipped > 0) {
+        input.updateReportedByteCount((int) numSkipped);
+      }
+
+      // To avoid dropped records, not advertising a new byte position
+      // when we are in BYBLOCK mode and the start position is 0
+      if (!(this.readMode == READ_MODE.BYBLOCK && this.startingPos == 0)) {
+        this.updatePos(false);
+      }
     }
 
     private BufferedInputStream readStreamHeader() throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b1d5e73c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
index 220d4a9..a7e6832 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTextInputFormat.java
@@ -184,6 +184,14 @@ public class TestTextInputFormat {
     // corner case when we have byte alignment and position of stream are same
     verifyPartitions(471507, 218, file, codec, conf);
     verifyPartitions(473608, 110, file, codec, conf);
+
+    // corner case when split size is small and position of stream is before
+    // the first BZip2 block
+    verifyPartitions(100, 20, file, codec, conf);
+    verifyPartitions(100, 25, file, codec, conf);
+    verifyPartitions(100, 30, file, codec, conf);
+    verifyPartitions(100, 50, file, codec, conf);
+    verifyPartitions(100, 100, file, codec, conf);
   }
 
   // Test a corner case when position of stream is right after BZip2 marker


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message