beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: [BEAM-2249] Correctly handle partial reads in AvroSource
Date Wed, 10 May 2017 20:00:32 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 d6bc8b834 -> 235e1cede


[BEAM-2249] Correctly handle partial reads in AvroSource


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f58ea41d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f58ea41d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f58ea41d

Branch: refs/heads/release-2.0.0
Commit: f58ea41d728bc989e38b4bb039db32342f759c3a
Parents: d6bc8b8
Author: Dan Halperin <dhalperi@google.com>
Authored: Wed May 10 10:11:53 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed May 10 13:00:19 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroSource.java | 21 ++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f58ea41d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 61bc4a4..37bbe46 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -59,6 +59,7 @@ import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
 import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;
 import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;
 import org.apache.commons.compress.utils.CountingInputStream;
+import org.apache.commons.compress.utils.IOUtils;
 
 // CHECKSTYLE.OFF: JavadocStyle
 /**
@@ -124,7 +125,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   // Default minimum bundle size (chosen as two default-size Avro blocks to attempt to
   // ensure that every source has at least one block of records).
   // The default sync interval is 64k.
-  static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL;
+  private static final long DEFAULT_MIN_BUNDLE_SIZE = 2 * DataFileConstants.DEFAULT_SYNC_INTERVAL;
 
   // The JSON schema used to encode records.
   private final String readSchemaString;
@@ -663,23 +664,27 @@ public class AvroSource<T> extends BlockBasedSource<T> {
       long headerSize = countStream.getBytesRead() - preHeaderCount;
 
       // Create the current block by reading blockSize bytes. Block sizes permitted by the
Avro
-      // specification are [32, 2^30], so this narrowing is ok.
+      // specification are [32, 2^30], so the cast is safe.
       byte[] data = new byte[(int) blockSize];
-      int read = stream.read(data);
-      checkState(blockSize == read, "Only %s/%s bytes in the block were read", read, blockSize);
+      int bytesRead = IOUtils.readFully(stream, data);
+      checkState(
+          blockSize == bytesRead,
+          "Only able to read %s/%s bytes in the block before EOF reached.",
+          bytesRead,
+          blockSize);
       currentBlock = new AvroBlock<>(data, numRecords, getCurrentSource());
 
       // Read the end of this block, which MUST be a sync marker for correctness.
       byte[] syncMarker = getCurrentSource().getSyncMarker();
       byte[] readSyncMarker = new byte[syncMarker.length];
       long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;
-      long bytesRead = stream.read(readSyncMarker);
+      bytesRead = IOUtils.readFully(stream, readSyncMarker);
       checkState(
           bytesRead == syncMarker.length,
-          "When trying to read a sync marker at position %s, only able to read %s/%s bytes",
-          syncMarkerOffset,
+          "Only able to read %s/%s bytes of Avro sync marker at position %s before EOF reached.",
           bytesRead,
-          syncMarker.length);
+          syncMarker.length,
+          syncMarkerOffset);
       if (!Arrays.equals(syncMarker, readSyncMarker)) {
         throw new IllegalStateException(
             String.format(


Mime
View raw message