Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B5CF7200C7D for ; Tue, 2 May 2017 03:45:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B2CA7160BC2; Tue, 2 May 2017 01:45:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C256E160BC5 for ; Tue, 2 May 2017 03:45:58 +0200 (CEST) Received: (qmail 49164 invoked by uid 500); 2 May 2017 01:45:57 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 48399 invoked by uid 99); 2 May 2017 01:45:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 May 2017 01:45:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5850AE0885; Tue, 2 May 2017 01:45:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jkff@apache.org To: commits@beam.apache.org Date: Tue, 02 May 2017 01:46:04 -0000 Message-Id: <2ab2688669cc46b99dfb6f4a729f55a7@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/11] beam git commit: Adds AvroIO.readGenericRecords() archived-at: Tue, 02 May 2017 01:45:59 -0000 Adds AvroIO.readGenericRecords() Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff7a1d42 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff7a1d42 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff7a1d42 Branch: refs/heads/master Commit: ff7a1d42f2902bebdf998d3f00b2b268ba150058 Parents: 1499d25 Author: Eugene Kirpichov Authored: Fri Apr 28 18:36:20 2017 -0700 Committer: Eugene Kirpichov Committed: Mon May 1 18:43:38 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/spark/io/AvroPipelineTest.java | 2 +- .../java/org/apache/beam/sdk/io/AvroIO.java | 31 ++++++++------------ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 8 ++--- .../apache/beam/sdk/io/AvroIOTransformTest.java | 8 ++--- 4 files changed, 19 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java index e3a44d2..62db14f 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java @@ -74,7 +74,7 @@ public class AvroPipelineTest { Pipeline p = pipelineRule.createPipeline(); PCollection input = p.apply( - AvroIO.read().from(inputFile.getAbsolutePath()).withSchema(schema)); + AvroIO.readGenericRecords(schema).from(inputFile.getAbsolutePath())); input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema)); p.run().waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index abde9cb..ed172d1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -133,6 +133,18 @@ public class AvroIO { return new Read<>(); } + /** Reads Avro file(s) containing records of the specified schema. */ + public static Read readGenericRecords(Schema schema) { + return new Read<>(null, null, GenericRecord.class, schema); + } + + /** + * Like {@link #readGenericRecords(Schema)} but the schema is specified as a JSON-encoded string. + */ + public static Read readGenericRecords(String schema) { + return readGenericRecords(new Schema.Parser().parse(schema)); + } + /** Implementation of {@link #read}. */ public static class Read extends PTransform> { /** The filepattern to read from. */ @@ -178,25 +190,6 @@ public class AvroIO { return new Read<>(name, filepattern, type, ReflectData.get().getSchema(type)); } - /** - * Returns a new {@link PTransform} that's like this one but - * that reads Avro file(s) containing records of the specified schema. - */ - public Read withSchema(Schema schema) { - return new Read<>(name, filepattern, GenericRecord.class, schema); - } - - /** - * Returns a new {@link PTransform} that's like this one but - * that reads Avro file(s) containing records of the specified schema - * in a JSON-encoded string form. - * - *

Does not modify this object. - */ - public Read withSchema(String schema) { - return withSchema((new Schema.Parser()).parse(schema)); - } - @Override public PCollection expand(PBegin input) { if (filepattern == null) { http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 6d842b3..2144b0d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -282,10 +282,6 @@ public class AvroIOTest { p.run(); } - private TimestampedValue newValue(GenericClass element, Duration duration) { - return TimestampedValue.of(element, new Instant(0).plus(duration)); - } - private static class WindowedFilenamePolicy extends FilenamePolicy { String outputFilePrefix; @@ -550,8 +546,8 @@ public class AvroIOTest { public void testPrimitiveReadDisplayData() { DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); - AvroIO.Read read = AvroIO.read().from("foo.*") - .withSchema(Schema.create(Schema.Type.STRING)); + AvroIO.Read read = + AvroIO.readGenericRecords(Schema.create(Schema.Type.STRING)).from("foo.*"); Set displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); assertThat("AvroIO.Read should include the file pattern in its primitive transform", http://git-wip-us.apache.org/repos/asf/beam/blob/ff7a1d42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java index 06b9841..b974663 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTransformTest.java @@ -185,14 +185,14 @@ public class AvroIOTransformTest { // test read using schema object new Object[] { null, - AvroIO.read().withSchema(SCHEMA), + AvroIO.readGenericRecords(SCHEMA), "AvroIO.Read/Read.out", generateAvroGenericRecords(), fromSchema }, new Object[] { "MyRead", - AvroIO.read().withSchema(SCHEMA), + AvroIO.readGenericRecords(SCHEMA), "MyRead/Read.out", generateAvroGenericRecords(), fromSchema @@ -201,14 +201,14 @@ public class AvroIOTransformTest { // test read using schema string new Object[] { null, - AvroIO.read().withSchema(SCHEMA_STRING), + AvroIO.readGenericRecords(SCHEMA_STRING), "AvroIO.Read/Read.out", generateAvroGenericRecords(), fromSchemaString }, new Object[] { "MyRead", - AvroIO.read().withSchema(SCHEMA_STRING), + AvroIO.readGenericRecords(SCHEMA_STRING), "MyRead/Read.out", generateAvroGenericRecords(), fromSchemaString