beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: Merge pull request #15298 from [BEAM-12729] Suppress Avro Runtime Exceptions for Streaming
Date Mon, 09 Aug 2021 21:38:29 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d4a9cc  Merge pull request #15298 from [BEAM-12729] Suppress Avro Runtime Exceptions
for Streaming
1d4a9cc is described below

commit 1d4a9ccd11c14ac6f0a2de1cc438a881244ede0a
Author: Dylan Hercher <dhercher@google.com>
AuthorDate: Mon Aug 9 14:36:48 2021 -0700

    Merge pull request #15298 from [BEAM-12729] Suppress Avro Runtime Exceptions for Streaming
    
    * add readerThreadCount variable to allow file reads to control threads after the reshuffle
occurs
    
    * supply type  inference
    
    * spotless
    
    * chain old constructor for backwards compat
    
    * add nullable checker
    
    * add testing of threads
    
    * use valueof
    
    * adding nullable to the var
    
    * add supgress notifications logic
    
    * add cast to null
    
    * cast to integer
    
    * linting
    
    * test removing num buckets
    
    * use numBuckets
    
    * shift from threading to controlling if reshuffle is used
    
    * spotless
    
    * adding experimental
    
    * rename second test
    
    * fixing checkstyle errors
    
    * spell check
    
    * adding a fiel exception handler
    
    Co-authored-by: Pablo <pabloem@users.noreply.github.com>
---
 .../main/java/org/apache/beam/sdk/io/AvroIO.java   | 46 ++++++++++++++++++----
 .../beam/sdk/io/ReadAllViaFileBasedSource.java     | 41 +++++++++++++++++--
 2 files changed, 75 insertions(+), 12 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 7c7f4c3..353d822 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
+import static org.apache.beam.sdk.io.ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
@@ -359,7 +360,8 @@ public class AvroIO {
         .setSchema(ReflectData.get().getSchema(recordClass))
         .setInferBeamSchema(false)
         .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
-        .setUsesReshuffle(DEFAULT_USES_RESHUFFLE)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
         .build();
   }
 
@@ -402,7 +404,8 @@ public class AvroIO {
         .setSchema(schema)
         .setInferBeamSchema(false)
         .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
-        .setUsesReshuffle(DEFAULT_USES_RESHUFFLE)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
         .build();
   }
 
@@ -474,7 +477,8 @@ public class AvroIO {
     return new AutoValue_AvroIO_ParseFiles.Builder<T>()
         .setParseFn(parseFn)
         .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
-        .setUsesReshuffle(DEFAULT_USES_RESHUFFLE)
+        .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
+        .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
         .build();
   }
 
@@ -582,9 +586,6 @@ public class AvroIO {
    */
   private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
 
-  /** ReShuffle before avro file reads by default. */
-  private static final boolean DEFAULT_USES_RESHUFFLE = true;
-
   /** Implementation of {@link #read} and {@link #readGenericRecords}. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
{
@@ -761,6 +762,8 @@ public class AvroIO {
 
     abstract boolean getUsesReshuffle();
 
+    abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
     abstract long getDesiredBundleSizeBytes();
 
     abstract boolean getInferBeamSchema();
@@ -777,6 +780,9 @@ public class AvroIO {
 
       abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
 
+      abstract Builder<T> setFileExceptionHandler(
+          ReadFileRangesFnExceptionHandler exceptionHandler);
+
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
 
       abstract Builder<T> setInferBeamSchema(boolean infer);
@@ -797,6 +803,13 @@ public class AvroIO {
       return toBuilder().setUsesReshuffle(usesReshuffle).build();
     }
 
+    /** Specifies if exceptions should be logged only for streaming pipelines. */
+    @Experimental(Kind.FILESYSTEM)
+    public ReadFiles<T> withFileExceptionHandler(
+        ReadFileRangesFnExceptionHandler exceptionHandler) {
+      return toBuilder().setFileExceptionHandler(exceptionHandler).build();
+    }
+
     /**
      * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the
output
      * to be used by SQL and by the schema-transform library.
@@ -821,7 +834,8 @@ public class AvroIO {
                   new CreateSourceFn<>(
                       getRecordClass(), getSchema().toString(), getDatumReaderFactory()),
                   AvroCoder.of(getRecordClass(), getSchema()),
-                  getUsesReshuffle()));
+                  getUsesReshuffle(),
+                  getFileExceptionHandler()));
       return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) :
read;
     }
 
@@ -1094,6 +1108,8 @@ public class AvroIO {
 
     abstract boolean getUsesReshuffle();
 
+    abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
+
     abstract long getDesiredBundleSizeBytes();
 
     abstract Builder<T> toBuilder();
@@ -1106,6 +1122,9 @@ public class AvroIO {
 
       abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
 
+      abstract Builder<T> setFileExceptionHandler(
+          ReadFileRangesFnExceptionHandler exceptionHandler);
+
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
 
       abstract ParseFiles<T> build();
@@ -1122,6 +1141,13 @@ public class AvroIO {
       return toBuilder().setUsesReshuffle(usesReshuffle).build();
     }
 
+    /** Specifies if exceptions should be logged only for streaming pipelines. */
+    @Experimental(Kind.FILESYSTEM)
+    public ParseFiles<T> withFileExceptionHandler(
+        ReadFileRangesFnExceptionHandler exceptionHandler) {
+      return toBuilder().setFileExceptionHandler(exceptionHandler).build();
+    }
+
     @VisibleForTesting
     ParseFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
       return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -1137,7 +1163,11 @@ public class AvroIO {
       return input.apply(
           "Parse Files via FileBasedSource",
           new ReadAllViaFileBasedSource<>(
-              getDesiredBundleSizeBytes(), createSource, coder, getUsesReshuffle()));
+              getDesiredBundleSizeBytes(),
+              createSource,
+              coder,
+              getUsesReshuffle(),
+              getFileExceptionHandler()));
     }
 
     @Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
index 552396f..f21ba97 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
+import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
@@ -33,6 +34,8 @@ import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Reads each file in the input {@link PCollection} of {@link ReadableFile} using given parameters
@@ -45,27 +48,38 @@ import org.apache.beam.sdk.values.PCollection;
 @Experimental(Kind.SOURCE_SINK)
 public class ReadAllViaFileBasedSource<T>
     extends PTransform<PCollection<ReadableFile>, PCollection<T>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReadAllViaFileBasedSource.class);
+  protected static final boolean DEFAULT_USES_RESHUFFLE = true;
   private final long desiredBundleSizeBytes;
   private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource;
   private final Coder<T> coder;
+  private final ReadFileRangesFnExceptionHandler exceptionHandler;
   private final boolean usesReshuffle;
 
   public ReadAllViaFileBasedSource(
       long desiredBundleSizeBytes,
       SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
       Coder<T> coder) {
-    this(desiredBundleSizeBytes, createSource, coder, true);
+    this(
+        desiredBundleSizeBytes,
+        createSource,
+        coder,
+        DEFAULT_USES_RESHUFFLE,
+        new ReadFileRangesFnExceptionHandler());
   }
 
   public ReadAllViaFileBasedSource(
       long desiredBundleSizeBytes,
       SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
       Coder<T> coder,
-      boolean usesReshuffle) {
+      boolean usesReshuffle,
+      ReadFileRangesFnExceptionHandler exceptionHandler) {
     this.desiredBundleSizeBytes = desiredBundleSizeBytes;
     this.createSource = createSource;
     this.coder = coder;
     this.usesReshuffle = usesReshuffle;
+    this.exceptionHandler = exceptionHandler;
   }
 
   @Override
@@ -76,7 +90,7 @@ public class ReadAllViaFileBasedSource<T>
       ranges = ranges.apply("Reshuffle", Reshuffle.viaRandomKey());
     }
     return ranges
-        .apply("Read ranges", ParDo.of(new ReadFileRangesFn<>(createSource)))
+        .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource, exceptionHandler)))
         .setCoder(coder);
   }
 
@@ -103,10 +117,13 @@ public class ReadAllViaFileBasedSource<T>
 
   private static class ReadFileRangesFn<T> extends DoFn<KV<ReadableFile, OffsetRange>,
T> {
     private final SerializableFunction<String, ? extends FileBasedSource<T>>
createSource;
+    private final ReadFileRangesFnExceptionHandler exceptionHandler;
 
     private ReadFileRangesFn(
-        SerializableFunction<String, ? extends FileBasedSource<T>> createSource)
{
+        SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
+        ReadFileRangesFnExceptionHandler exceptionHandler) {
       this.createSource = createSource;
+      this.exceptionHandler = exceptionHandler;
     }
 
     @ProcessElement
@@ -126,7 +143,23 @@ public class ReadAllViaFileBasedSource<T>
         for (boolean more = reader.start(); more; more = reader.advance()) {
           c.output(reader.getCurrent());
         }
+      } catch (RuntimeException e) {
+        if (exceptionHandler.apply(file, range, e)) {
+          throw e;
+        }
       }
     }
   }
+
+  /** A class to handle errors which occur during file reads. */
+  public static class ReadFileRangesFnExceptionHandler implements Serializable {
+
+    /*
+     * Applies the desired handler logic to the given exception and returns
+     * if the exception should be thrown.
+     */
+    public boolean apply(ReadableFile file, OffsetRange range, Exception e) {
+      return true;
+    }
+  }
 }

Mime
View raw message