beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/4] beam git commit: Introduces AvroIO.readAll() and readAllGenericRecords()
Date Wed, 26 Jul 2017 00:51:50 GMT
Repository: beam
Updated Branches:
  refs/heads/master 71196ec9c -> d919394c7


Introduces AvroIO.readAll() and readAllGenericRecords()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee1bcbae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee1bcbae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee1bcbae

Branch: refs/heads/master
Commit: ee1bcbae08fb221e392175fbd0387594653d4a86
Parents: eaf0b36
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Fri Jul 21 14:09:35 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Tue Jul 25 17:36:49 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 132 +++++++++++++++++--
 .../java/org/apache/beam/sdk/io/AvroUtils.java  |  40 ++++++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  65 ++++++++-
 3 files changed, 223 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d308c85..f201114 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -21,6 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.io.BaseEncoding;
@@ -54,14 +56,17 @@ import org.apache.beam.sdk.values.PDone;
  * {@link PTransform}s for reading and writing Avro files.
  *
  * <p>To read a {@link PCollection} from one or more Avro files, use {@code AvroIO.read()},
using
- * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. See {@link
- * FileSystems} for information on supported file systems and filepatterns.
+ * {@link AvroIO.Read#from} to specify the filename or filepattern to read from. Alternatively,
if
+ * the filepatterns to be read are themselves in a {@link PCollection}, apply {@link #readAll}.
+ *
+ * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
  *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
To read
  * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes
a
  * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema
in a
  * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
- * schema.
+ * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link
+ * #readAllGenericRecords}.
  *
  * <p>For example:
  *
@@ -79,6 +84,18 @@ import org.apache.beam.sdk.values.PDone;
  *                .from("gs://my_bucket/path/to/records-*.avro"));
  * }</pre>
  *
+ * <p>Reading from a {@link PCollection} of filepatterns:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> filepatterns = p.apply(...);
+ * PCollection<AvroAutoGenClass> records =
+ *     filepatterns.apply(AvroIO.read(AvroAutoGenClass.class));
+ * PCollection<GenericRecord> genericRecords =
+ *     filepatterns.apply(AvroIO.readGenericRecords(schema));
+ * }</pre>
+ *
  * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write},
using
  * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
  * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate}
(set
@@ -133,6 +150,18 @@ public class AvroIO {
         .build();
   }
 
+  /** Like {@link #read}, but reads each filepattern in the input {@link PCollection}. */
+  public static <T> ReadAll<T> readAll(Class<T> recordClass) {
+    return new AutoValue_AvroIO_ReadAll.Builder<T>()
+        .setRecordClass(recordClass)
+        .setSchema(ReflectData.get().getSchema(recordClass))
+        // 64MB is a reasonable value that allows to amortize the cost of opening files,
+        // but is not so large as to exhaust a typical runner's maximum amount of output
per
+        // ProcessElement call.
+        .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+        .build();
+  }
+
   /** Reads Avro file(s) containing records of the specified schema. */
   public static Read<GenericRecord> readGenericRecords(Schema schema) {
     return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
@@ -142,6 +171,17 @@ public class AvroIO {
   }
 
   /**
+   * Like {@link #readGenericRecords(Schema)}, but reads each filepattern in the input {@link
+   * PCollection}.
+   */
+  public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
+    return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+        .setRecordClass(GenericRecord.class)
+        .setSchema(schema)
+        .build();
+  }
+
+  /**
    * Reads Avro file(s) containing records of the specified schema. The schema is specified
as a
    * JSON-encoded string.
    */
@@ -150,6 +190,14 @@ public class AvroIO {
   }
 
   /**
+   * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link
+   * PCollection}.
+   */
+  public static ReadAll<GenericRecord> readAllGenericRecords(String schema) {
+    return readAllGenericRecords(new Schema.Parser().parse(schema));
+  }
+
+  /**
    * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
    * pattern).
    */
@@ -217,14 +265,12 @@ public class AvroIO {
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getFilepattern(), "filepattern");
       checkNotNull(getSchema(), "schema");
-
-      @SuppressWarnings("unchecked")
-      AvroSource<T> source =
-          getRecordClass() == GenericRecord.class
-              ? (AvroSource<T>) AvroSource.from(getFilepattern()).withSchema(getSchema())
-              : AvroSource.from(getFilepattern()).withSchema(getRecordClass());
-
-      return input.getPipeline().apply("Read", org.apache.beam.sdk.io.Read.from(source));
+      return input
+          .getPipeline()
+          .apply(
+              "Read",
+              org.apache.beam.sdk.io.Read.from(
+                  createSource(getFilepattern(), getRecordClass(), getSchema())));
     }
 
     @Override
@@ -233,6 +279,70 @@ public class AvroIO {
       builder.addIfNotNull(
           DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
     }
+
+    @SuppressWarnings("unchecked")
+    private static <T> AvroSource<T> createSource(
+        ValueProvider<String> filepattern, Class<T> recordClass, Schema schema)
{
+      return recordClass == GenericRecord.class
+          ? (AvroSource<T>) AvroSource.from(filepattern).withSchema(schema)
+          : AvroSource.from(filepattern).withSchema(recordClass);
+    }
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Implementation of {@link #readAll}. */
+  @AutoValue
+  public abstract static class ReadAll<T> extends PTransform<PCollection<String>,
PCollection<T>> {
+    @Nullable abstract Class<T> getRecordClass();
+    @Nullable abstract Schema getSchema();
+    abstract long getDesiredBundleSizeBytes();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setRecordClass(Class<T> recordClass);
+      abstract Builder<T> setSchema(Schema schema);
+      abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
+
+      abstract ReadAll<T> build();
+    }
+
+    @VisibleForTesting
+    ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
+      return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PCollection<String> input) {
+      checkNotNull(getSchema(), "schema");
+      return input
+          .apply(
+              "Read all via FileBasedSource",
+              new ReadAllViaFileBasedSource<>(
+                  SerializableFunctions.<String, Boolean>constant(true) /* isSplittable
*/,
+                  getDesiredBundleSizeBytes(),
+                  new CreateSourceFn<>(getRecordClass(), getSchema().toString())))
+          .setCoder(AvroCoder.of(getRecordClass(), getSchema()));
+    }
+  }
+
+  private static class CreateSourceFn<T>
+      implements SerializableFunction<String, FileBasedSource<T>> {
+    private final Class<T> recordClass;
+    private final Supplier<Schema> schemaSupplier;
+
+    public CreateSourceFn(Class<T> recordClass, String jsonSchema) {
+      this.recordClass = recordClass;
+      this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema);
+    }
+
+    @Override
+    public FileBasedSource<T> apply(String input) {
+      return Read.createSource(
+          StaticValueProvider.of(input), recordClass, schemaSupplier.get());
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java
new file mode 100644
index 0000000..65c5bf1
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import java.io.Serializable;
+import org.apache.avro.Schema;
+
+/** Helpers for working with Avro. */
+class AvroUtils {
+  /** Helper to get around the fact that {@link Schema} itself is not serializable. */
+  public static Supplier<Schema> serializableSchemaSupplier(String jsonSchema) {
+    return Suppliers.memoize(
+        Suppliers.compose(new JsonToSchema(), Suppliers.ofInstance(jsonSchema)));
+  }
+
+  private static class JsonToSchema implements Function<String, Schema>, Serializable
{
+    @Override
+    public Schema apply(String input) {
+      return new Schema.Parser().parse(input);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/ee1bcbae/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 4380c57..df5d26c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -30,6 +30,7 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import java.io.File;
@@ -152,10 +153,68 @@ public class AvroIOTest {
         .apply(AvroIO.write(GenericClass.class).to(outputFile.getAbsolutePath()).withoutSharding());
     writePipeline.run().waitUntilFinish();
 
-    PCollection<GenericClass> input =
-        readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath()));
+    // Test both read() and readAll()
+    PAssert.that(
+            readPipeline.apply(AvroIO.read(GenericClass.class).from(outputFile.getAbsolutePath())))
+        .containsInAnyOrder(values);
+    PAssert.that(
+            readPipeline
+                .apply(Create.of(outputFile.getAbsolutePath()))
+                .apply(AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(values);
+
+    readPipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAvroIOWriteAndReadMultipleFilepatterns() throws Throwable {
+    List<GenericClass> firstValues = Lists.newArrayList();
+    List<GenericClass> secondValues = Lists.newArrayList();
+    for (int i = 0; i < 10; ++i) {
+      firstValues.add(new GenericClass(i, "a" + i));
+      secondValues.add(new GenericClass(i, "b" + i));
+    }
+    writePipeline
+        .apply("Create first", Create.of(firstValues))
+        .apply(
+            "Write first",
+            AvroIO.write(GenericClass.class)
+                .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
+                .withNumShards(2));
+    writePipeline
+        .apply("Create second", Create.of(secondValues))
+        .apply(
+            "Write second",
+            AvroIO.write(GenericClass.class)
+                .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
+                .withNumShards(3));
+    writePipeline.run().waitUntilFinish();
+
+    // Test both read() and readAll()
+    PAssert.that(
+            readPipeline.apply(
+                "Read first",
+                AvroIO.read(GenericClass.class)
+                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")))
+        .containsInAnyOrder(firstValues);
+    PAssert.that(
+            readPipeline.apply(
+                "Read second",
+                AvroIO.read(GenericClass.class)
+                    .from(tmpFolder.getRoot().getAbsolutePath() + "/second*")))
+        .containsInAnyOrder(secondValues);
+    PAssert.that(
+            readPipeline
+                .apply(
+                    "Create paths",
+                    Create.of(
+                        tmpFolder.getRoot().getAbsolutePath() + "/first*",
+                        tmpFolder.getRoot().getAbsolutePath() + "/second*"))
+                .apply(
+                    "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
 
-    PAssert.that(input).containsInAnyOrder(values);
     readPipeline.run();
   }
 


Mime
View raw message