beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: Merge pull request #15246 from [BEAM-12685] Allow managed thread count in AvroIO
Date Sat, 07 Aug 2021 00:57:39 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ac57388  Merge pull request #15246 from [BEAM-12685] Allow managed thread count in
AvroIO
ac57388 is described below

commit ac573882c9b1005c3802552cd6268254293b0002
Author: Dylan Hercher <dhercher@google.com>
AuthorDate: Fri Aug 6 17:56:44 2021 -0700

    Merge pull request #15246 from [BEAM-12685] Allow managed thread count in AvroIO
    
    * add readerThreadCount variable to allow file reads to control threads after the reshuffle
occurs
    
    * supply type  inference
    
    * spotless
    
    * chain old constructor for backwards compat
    
    * add nullable checker
    
    * add testing of threads
    
    * use valueof
    
    * adding nullable to the var
    
    * add cast to null
    
    * cast to integer
    
    * linting
    
    * test removing num buckets
    
    * use numBuckets
    
    * shift from threading to controlling if reshuffle is used
    
    * spotless
    
    * adding experimental
    
    * rename second test
    
    * fixing checkstyle errors
    
    Co-authored-by: Pablo <pabloem@users.noreply.github.com>
---
 .../main/java/org/apache/beam/sdk/io/AvroIO.java   | 32 ++++++++++++++++++++--
 .../beam/sdk/io/ReadAllViaFileBasedSource.java     | 19 +++++++++++--
 .../java/org/apache/beam/sdk/io/AvroIOTest.java    | 14 ++++++++++
 3 files changed, 60 insertions(+), 5 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 17ecdb1..7c7f4c3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -359,6 +359,7 @@ public class AvroIO {
         .setSchema(ReflectData.get().getSchema(recordClass))
         .setInferBeamSchema(false)
         .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(DEFAULT_USES_RESHUFFLE)
         .build();
   }
 
@@ -401,6 +402,7 @@ public class AvroIO {
         .setSchema(schema)
         .setInferBeamSchema(false)
         .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(DEFAULT_USES_RESHUFFLE)
         .build();
   }
 
@@ -472,6 +474,7 @@ public class AvroIO {
     return new AutoValue_AvroIO_ParseFiles.Builder<T>()
         .setParseFn(parseFn)
         .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
+        .setUsesReshuffle(DEFAULT_USES_RESHUFFLE)
         .build();
   }
 
@@ -579,6 +582,9 @@ public class AvroIO {
    */
   private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
 
+  /** ReShuffle before avro file reads by default. */
+  private static final boolean DEFAULT_USES_RESHUFFLE = true;
+
   /** Implementation of {@link #read} and {@link #readGenericRecords}. */
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
{
@@ -753,6 +759,8 @@ public class AvroIO {
 
     abstract @Nullable Schema getSchema();
 
+    abstract boolean getUsesReshuffle();
+
     abstract long getDesiredBundleSizeBytes();
 
     abstract boolean getInferBeamSchema();
@@ -767,6 +775,8 @@ public class AvroIO {
 
       abstract Builder<T> setSchema(Schema schema);
 
+      abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
 
       abstract Builder<T> setInferBeamSchema(boolean infer);
@@ -781,6 +791,12 @@ public class AvroIO {
       return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
     }
 
+    /** Specifies if a Reshuffle should run before file reads occur. */
+    @Experimental(Kind.FILESYSTEM)
+    public ReadFiles<T> withUsesReshuffle(boolean usesReshuffle) {
+      return toBuilder().setUsesReshuffle(usesReshuffle).build();
+    }
+
     /**
      * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the
output
      * to be used by SQL and by the schema-transform library.
@@ -804,7 +820,8 @@ public class AvroIO {
                   getDesiredBundleSizeBytes(),
                   new CreateSourceFn<>(
                       getRecordClass(), getSchema().toString(), getDatumReaderFactory()),
-                  AvroCoder.of(getRecordClass(), getSchema())));
+                  AvroCoder.of(getRecordClass(), getSchema()),
+                  getUsesReshuffle()));
       return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) :
read;
     }
 
@@ -1075,6 +1092,8 @@ public class AvroIO {
 
     abstract @Nullable Coder<T> getCoder();
 
+    abstract boolean getUsesReshuffle();
+
     abstract long getDesiredBundleSizeBytes();
 
     abstract Builder<T> toBuilder();
@@ -1085,6 +1104,8 @@ public class AvroIO {
 
       abstract Builder<T> setCoder(Coder<T> coder);
 
+      abstract Builder<T> setUsesReshuffle(boolean usesReshuffle);
+
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
 
       abstract ParseFiles<T> build();
@@ -1095,6 +1116,12 @@ public class AvroIO {
       return toBuilder().setCoder(coder).build();
     }
 
+    /** Specifies if a Reshuffle should run before file reads occur. */
+    @Experimental(Kind.FILESYSTEM)
+    public ParseFiles<T> withUsesReshuffle(boolean usesReshuffle) {
+      return toBuilder().setUsesReshuffle(usesReshuffle).build();
+    }
+
     @VisibleForTesting
     ParseFiles<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
       return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -1109,7 +1136,8 @@ public class AvroIO {
           new CreateParseSourceFn<>(parseFn, coder);
       return input.apply(
           "Parse Files via FileBasedSource",
-          new ReadAllViaFileBasedSource<>(getDesiredBundleSizeBytes(), createSource,
coder));
+          new ReadAllViaFileBasedSource<>(
+              getDesiredBundleSizeBytes(), createSource, coder, getUsesReshuffle()));
     }
 
     @Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
index 2c66ad5..552396f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -48,21 +48,34 @@ public class ReadAllViaFileBasedSource<T>
   private final long desiredBundleSizeBytes;
   private final SerializableFunction<String, ? extends FileBasedSource<T>> createSource;
   private final Coder<T> coder;
+  private final boolean usesReshuffle;
 
   public ReadAllViaFileBasedSource(
       long desiredBundleSizeBytes,
       SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
       Coder<T> coder) {
+    this(desiredBundleSizeBytes, createSource, coder, true);
+  }
+
+  public ReadAllViaFileBasedSource(
+      long desiredBundleSizeBytes,
+      SerializableFunction<String, ? extends FileBasedSource<T>> createSource,
+      Coder<T> coder,
+      boolean usesReshuffle) {
     this.desiredBundleSizeBytes = desiredBundleSizeBytes;
     this.createSource = createSource;
     this.coder = coder;
+    this.usesReshuffle = usesReshuffle;
   }
 
   @Override
   public PCollection<T> expand(PCollection<ReadableFile> input) {
-    return input
-        .apply("Split into ranges", ParDo.of(new SplitIntoRangesFn(desiredBundleSizeBytes)))
-        .apply("Reshuffle", Reshuffle.viaRandomKey())
+    PCollection<KV<ReadableFile, OffsetRange>> ranges =
+        input.apply("Split into ranges", ParDo.of(new SplitIntoRangesFn(desiredBundleSizeBytes)));
+    if (usesReshuffle) {
+      ranges = ranges.apply("Reshuffle", Reshuffle.viaRandomKey());
+    }
+    return ranges
         .apply("Read ranges", ParDo.of(new ReadFileRangesFn<>(createSource)))
         .setCoder(coder);
   }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index dc36685..1663ecf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -475,6 +475,20 @@ public class AvroIOTest implements Serializable {
                       "ParseFilesGenericRecords",
                       AvroIO.parseFilesGenericRecords(new ParseGenericClass())
                           .withCoder(AvroCoder.of(GenericClass.class))
+                          .withUsesReshuffle(false)
+                          .withDesiredBundleSizeBytes(10)))
+          .containsInAnyOrder(values);
+      PAssert.that(
+              path.apply("MatchAllParseFilesGenericRecordsWithShuffle", FileIO.matchAll())
+                  .apply(
+                      "ReadMatchesParseFilesGenericRecordsWithShuffle",
+                      FileIO.readMatches()
+                          .withDirectoryTreatment(FileIO.ReadMatches.DirectoryTreatment.PROHIBIT))
+                  .apply(
+                      "ParseFilesGenericRecordsWithShuffle",
+                      AvroIO.parseFilesGenericRecords(new ParseGenericClass())
+                          .withCoder(AvroCoder.of(GenericClass.class))
+                          .withUsesReshuffle(true)
                           .withDesiredBundleSizeBytes(10)))
           .containsInAnyOrder(values);
       PAssert.that(

Mime
View raw message