Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AE93F200D0E for ; Tue, 12 Sep 2017 00:38:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AD4081609C6; Mon, 11 Sep 2017 22:38:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 57F881609C4 for ; Tue, 12 Sep 2017 00:38:20 +0200 (CEST) Received: (qmail 66880 invoked by uid 500); 11 Sep 2017 22:38:19 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 66871 invoked by uid 99); 11 Sep 2017 22:38:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Sep 2017 22:38:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A7001F57BF; Mon, 11 Sep 2017 22:38:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jkff@apache.org To: commits@beam.apache.org Date: Mon, 11 Sep 2017 22:38:17 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] beam git commit: Cleans up PTransform validation across Beam archived-at: Mon, 11 Sep 2017 22:38:22 -0000 Repository: beam Updated Branches: refs/heads/master 98b0fd2b4 -> e8bf045f6 http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java index 6eb1a33..ad4e47b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java @@ -84,75 +84,6 @@ public class SpannerIOReadTest implements Serializable { } @Test - public void emptyTransform() throws Exception { - SpannerIO.Read read = SpannerIO.read(); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires instance id to be set with"); - read.validate(null); - } - - @Test - public void emptyInstanceId() throws Exception { - SpannerIO.Read read = SpannerIO.read().withDatabaseId("123"); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires instance id to be set with"); - read.validate(null); - } - - @Test - public void emptyDatabaseId() throws Exception { - SpannerIO.Read read = SpannerIO.read().withInstanceId("123"); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires database id to be set with"); - read.validate(null); - } - - @Test - public void emptyQuery() throws Exception { - SpannerIO.Read read = - SpannerIO.read().withInstanceId("123").withDatabaseId("aaa").withTimestamp(Timestamp.now()); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("requires configuring query or read operation"); - read.validate(null); - } - - @Test - public void emptyColumns() throws Exception { - SpannerIO.Read read = - SpannerIO.read() - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withTable("users"); - thrown.expect(NullPointerException.class); - thrown.expectMessage("requires a list of columns"); - read.validate(null); - } - - @Test - public void validRead() throws Exception { - SpannerIO.Read read = - SpannerIO.read() - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withTable("users") - .withColumns("id", "name", "email"); - read.validate(null); - } - - @Test - public void validQuery() throws Exception { - SpannerIO.Read read = - SpannerIO.read() - .withInstanceId("123") - .withDatabaseId("aaa") - .withTimestamp(Timestamp.now()) - .withQuery("SELECT * FROM users"); - read.validate(null); - } - - @Test public void runQuery() throws Exception { SpannerIO.Read read = SpannerIO.read() http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java index 09cdb8e..53783d1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java @@ -66,7 +66,7 @@ public class SpannerIOWriteTest implements Serializable { SpannerIO.Write write = SpannerIO.write(); thrown.expect(NullPointerException.class); thrown.expectMessage("requires instance id to be set with"); - write.validate(null); + write.expand(null); } @Test @@ -74,7 +74,7 @@ public class SpannerIOWriteTest implements Serializable { SpannerIO.Write write = SpannerIO.write().withDatabaseId("123"); thrown.expect(NullPointerException.class); thrown.expectMessage("requires instance id to be set with"); - write.validate(null); + write.expand(null); } @Test @@ -82,7 +82,7 @@ public class SpannerIOWriteTest implements Serializable { SpannerIO.Write write = SpannerIO.write().withInstanceId("123"); thrown.expect(NullPointerException.class); thrown.expectMessage("requires database id to be set with"); - write.validate(null); + write.expand(null); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hadoop/input-format/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml index 0953119..910a009 100644 --- a/sdks/java/io/hadoop/input-format/pom.xml +++ b/sdks/java/io/hadoop/input-format/pom.xml @@ -27,15 +27,10 @@ Apache Beam :: SDKs :: Java :: IO :: Hadoop :: input-format IO to read data from data sources which implement Hadoop Input Format. - - 19.0 - - com.google.guava guava - ${guava.version} org.slf4j http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java index 20ca50a..89df555 100644 --- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java +++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java @@ -15,7 +15,6 @@ package org.apache.beam.sdk.io.hadoop.inputformat; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -219,12 +218,7 @@ public class HadoopInputFormatIO { abstract Read build(); } - /** - * Returns a new {@link HadoopInputFormatIO.Read} that will read from the source using the - * options provided by the given configuration. - * - *

Does not modify this object. - */ + /** Reads from the source using the options provided by the given configuration. */ public Read withConfiguration(Configuration configuration) { validateConfiguration(configuration); TypeDescriptor inputFormatClass = @@ -255,27 +249,17 @@ public class HadoopInputFormatIO { return builder.build(); } - /** - * Returns a new {@link HadoopInputFormatIO.Read} that will transform the keys read from the - * source using the given key translation function. - * - *

Does not modify this object. - */ + /** Transforms the keys read from the source using the given key translation function. */ public Read withKeyTranslation(SimpleFunction function) { - checkNotNull(function, "function"); + checkArgument(function != null, "function can not be null"); // Sets key class to key translation function's output class type. return toBuilder().setKeyTranslationFunction(function) .setKeyTypeDescriptor((TypeDescriptor) function.getOutputTypeDescriptor()).build(); } - /** - * Returns a new {@link HadoopInputFormatIO.Read} that will transform the values read from the - * source using the given value translation function. - * - *

Does not modify this object. - */ + /** Transforms the values read from the source using the given value translation function. */ public Read withValueTranslation(SimpleFunction function) { - checkNotNull(function, "function"); + checkArgument(function != null, "function can not be null"); // Sets value class to value translation function's output class type. return toBuilder().setValueTranslationFunction(function) .setValueTypeDescriptor((TypeDescriptor) function.getOutputTypeDescriptor()).build(); @@ -302,12 +286,14 @@ public class HadoopInputFormatIO { * key and value classes are provided in the Hadoop configuration. */ private void validateConfiguration(Configuration configuration) { - checkNotNull(configuration, "configuration"); - checkNotNull(configuration.get("mapreduce.job.inputformat.class"), - "configuration.get(\"mapreduce.job.inputformat.class\")"); - checkNotNull(configuration.get("key.class"), "configuration.get(\"key.class\")"); - checkNotNull(configuration.get("value.class"), - "configuration.get(\"value.class\")"); + checkArgument(configuration != null, "configuration can not be null"); + checkArgument( + configuration.get("mapreduce.job.inputformat.class") != null, + "Configuration must contain \"mapreduce.job.inputformat.class\""); + checkArgument( + configuration.get("key.class") != null, "configuration must contain \"key.class\""); + checkArgument( + configuration.get("value.class") != null, "configuration must contain \"value.class\""); } /** @@ -315,7 +301,7 @@ public class HadoopInputFormatIO { */ @VisibleForTesting void validateTransform() { - checkNotNull(getConfiguration(), "getConfiguration()"); + checkArgument(getConfiguration() != null, "withConfiguration() is required"); // Validate that the key translation input type must be same as key class of InputFormat. validateTranslationFunction(getinputFormatKeyClass(), getKeyTranslationFunction(), "Key translation's input type is not same as hadoop InputFormat : %s key class : %s"); @@ -422,9 +408,9 @@ public class HadoopInputFormatIO { @Override public void validate() { - checkNotNull(conf, "conf"); - checkNotNull(keyCoder, "keyCoder"); - checkNotNull(valueCoder, "valueCoder"); + checkArgument(conf != null, "conf can not be null"); + checkArgument(keyCoder != null, "keyCoder can not be null"); + checkArgument(valueCoder != null, "valueCoder can not be null"); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java index 9ec3838..a474744 100644 --- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java +++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java @@ -161,7 +161,7 @@ public class HadoopInputFormatIOTest { */ @Test public void testReadObjectCreationFailsIfConfigurationIsNull() { - thrown.expect(NullPointerException.class); + thrown.expect(IllegalArgumentException.class); HadoopInputFormatIO.read() .withConfiguration(null); } @@ -192,7 +192,7 @@ public class HadoopInputFormatIOTest { */ @Test public void testReadObjectCreationFailsIfKeyTranslationFunctionIsNull() { - thrown.expect(NullPointerException.class); + thrown.expect(IllegalArgumentException.class); HadoopInputFormatIO.read() .withConfiguration(serConf.get()) .withKeyTranslation(null); @@ -225,7 +225,7 @@ public class HadoopInputFormatIOTest { */ @Test public void testReadObjectCreationFailsIfValueTranslationFunctionIsNull() { - thrown.expect(NullPointerException.class); + thrown.expect(IllegalArgumentException.class); HadoopInputFormatIO.read() .withConfiguration(serConf.get()) .withValueTranslation(null); @@ -278,7 +278,7 @@ public class HadoopInputFormatIOTest { @Test public void testReadValidationFailsMissingConfiguration() { HadoopInputFormatIO.Read read = HadoopInputFormatIO.read(); - thrown.expect(NullPointerException.class); + thrown.expect(IllegalArgumentException.class); read.validateTransform(); } @@ -292,7 +292,7 @@ public class HadoopInputFormatIOTest { Configuration configuration = new Configuration(); configuration.setClass("key.class", Text.class, Object.class); configuration.setClass("value.class", Employee.class, Object.class); - thrown.expect(NullPointerException.class); + thrown.expect(IllegalArgumentException.class); HadoopInputFormatIO.read() .withConfiguration(configuration); } @@ -307,7 +307,7 @@ public class HadoopInputFormatIOTest { configuration.setClass("mapreduce.job.inputformat.class", EmployeeInputFormat.class, InputFormat.class); configuration.setClass("value.class", Employee.class, Object.class); - thrown.expect(NullPointerException.class); + thrown.expect(IllegalArgumentException.class); HadoopInputFormatIO.read() .withConfiguration(configuration); } @@ -322,7 +322,7 @@ public class HadoopInputFormatIOTest { configuration.setClass("mapreduce.job.inputformat.class", EmployeeInputFormat.class, InputFormat.class); configuration.setClass("key.class", Text.class, Object.class); - thrown.expect(NullPointerException.class); + thrown.expect(IllegalArgumentException.class); HadoopInputFormatIO.read().withConfiguration(configuration); } http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index 41ced93..393402a 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -159,78 +159,69 @@ public class HBaseIO { return new Read(null, "", new SerializableScan(new Scan())); } - /** - * A {@link PTransform} that reads from HBase. See the class-level Javadoc on {@link HBaseIO} for - * more information. - * - * @see HBaseIO - */ - public static class Read extends PTransform> { - /** - * Returns a new {@link HBaseIO.Read} that will read from the HBase instance indicated by the - * given configuration. - */ - public Read withConfiguration(Configuration configuration) { - checkNotNull(configuration, "conf"); - return new Read(new SerializableConfiguration(configuration), tableId, serializableScan); - } - /** - * Returns a new {@link HBaseIO.Read} that will read from the specified table. + * A {@link PTransform} that reads from HBase. See the class-level Javadoc on + {@link HBaseIO} for* more information. * - *

Does not modify this object. + * @see HBaseIO */ - public Read withTableId(String tableId) { - checkNotNull(tableId, "tableId"); - return new Read(serializableConfiguration, tableId, serializableScan); - } + public static class Read extends PTransform> { + /** + Reads from the HBase instance + indicated by the* given configuration.*/ + + public Read withConfiguration(Configuration configuration) { + checkArgument(configuration != null, "configuration can not be null"); + return new Read(new SerializableConfiguration(configuration), + tableId, serializableScan); + } - /** - * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase using the given - * scan. - * - *

Does not modify this object. - */ - public Read withScan(Scan scan) { - checkNotNull(scan, "scan"); - return new Read(serializableConfiguration, tableId, new SerializableScan(scan)); - } + /** + Reads from the specified table.*/ - /** - * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase using the given - * row filter. - * - *

Does not modify this object. - */ - public Read withFilter(Filter filter) { - checkNotNull(filter, "filter"); - return withScan(serializableScan.get().setFilter(filter)); - } + public Read withTableId(String tableId) { + checkArgument(tableId != null, "tableIdcan not be null"); + return new Read(serializableConfiguration, tableId, serializableScan); + } - /** - * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range. - * - *

Does not modify this object. - */ - public Read withKeyRange(ByteKeyRange keyRange) { - checkNotNull(keyRange, "keyRange"); - byte[] startRow = keyRange.getStartKey().getBytes(); - byte[] stopRow = keyRange.getEndKey().getBytes(); - return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow)); - } + /** + Filters the rows read from HBase + using the given* scan.*/ - /** - * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range. - * - *

Does not modify this object. - */ - public Read withKeyRange(byte[] startRow, byte[] stopRow) { - checkNotNull(startRow, "startRow"); - checkNotNull(stopRow, "stopRow"); - ByteKeyRange keyRange = - ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); - return withKeyRange(keyRange); - } + public Read withScan(Scan scan) { + checkArgument(scan != null, "scancan not be null"); + return new Read(serializableConfiguration, tableId, new SerializableScan(scan)); + } + + /** + Filters the rows read from HBase + using the given* row filter.*/ + + public Read withFilter(Filter filter) { + checkArgument(filter != null, "filtercan not be null"); + return withScan(serializableScan.get().setFilter(filter)); + } + + /** + Reads only rows in the specified range.*/ + + public Read withKeyRange(ByteKeyRange keyRange) { + checkArgument(keyRange != null, "keyRangecan not be null"); + byte[] startRow = keyRange.getStartKey().getBytes(); + byte[] stopRow = keyRange.getEndKey().getBytes(); + return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow)); + } + + /** + Reads only rows in the specified range.*/ + + public Read withKeyRange(byte[] startRow, byte[] stopRow) { + checkArgument(startRow != null, "startRowcan not be null"); + checkArgument(stopRow != null, "stopRowcan not be null"); + ByteKeyRange keyRange = + ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow)); + return withKeyRange(keyRange); + } private Read( SerializableConfiguration serializableConfiguration, @@ -241,25 +232,22 @@ public class HBaseIO { this.serializableScan = serializableScan; } - @Override - public PCollection expand(PBegin input) { - HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */); - return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); - } - - @Override - public void validate(PipelineOptions options) { - checkArgument(serializableConfiguration != null, "Configuration not provided"); - checkArgument(!tableId.isEmpty(), "Table ID not specified"); - try (Connection connection = - ConnectionFactory.createConnection(serializableConfiguration.get())) { - Admin admin = connection.getAdmin(); - checkArgument( - admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId); - } catch (IOException e) { - LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); - } - } + @Override + public PCollection expand(PBegin input) { + checkArgument(serializableConfiguration != null, + "withConfiguration() is required"); + checkArgument(!tableId.isEmpty(), "withTableId() is required"); + try (Connection connection = ConnectionFactory.createConnection( + serializableConfiguration.get())) { + Admin admin = connection.getAdmin(); + checkArgument(admin.tableExists(TableName.valueOf(tableId)), + "Table %s does not exist", tableId); + } catch (IOException e) { + LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + } + HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */); + return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); + } @Override public void populateDisplayData(DisplayData.Builder builder) { @@ -609,58 +597,50 @@ public class HBaseIO { return new Write(null /* SerializableConfiguration */, ""); } - /** - * A {@link PTransform} that writes to HBase. See the class-level Javadoc on {@link HBaseIO} for - * more information. - * - * @see HBaseIO - */ - public static class Write extends PTransform, PDone> { /** - * Returns a new {@link HBaseIO.Write} that will write to the HBase instance indicated by the - * given Configuration, and using any other specified customizations. + * A {@link PTransform} that writes to HBase. See the class-level Javadoc on + {@link HBaseIO} for* more information. * - *

Does not modify this object. + * @see HBaseIO */ - public Write withConfiguration(Configuration configuration) { - checkNotNull(configuration, "conf"); - return new Write(new SerializableConfiguration(configuration), tableId); - } + public static class Write extends PTransform, PDone> { + /** + Writes to the HBase instance + indicated by the* given Configuration. + */ + public Write withConfiguration(Configuration configuration) { + checkArgument(configuration != null, "configuration can not be null"); + return new Write(new SerializableConfiguration(configuration), tableId); + } - /** - * Returns a new {@link HBaseIO.Write} that will write to the specified table. - * - *

Does not modify this object. - */ - public Write withTableId(String tableId) { - checkNotNull(tableId, "tableId"); - return new Write(serializableConfiguration, tableId); - } + /** + Writes to the specified table.*/ + + public Write withTableId(String tableId) { + checkArgument(tableId != null, "tableIdcan not be null"); + return new Write(serializableConfiguration, tableId); + } private Write(SerializableConfiguration serializableConfiguration, String tableId) { this.serializableConfiguration = serializableConfiguration; this.tableId = tableId; } - @Override - public PDone expand(PCollection input) { - input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration))); - return PDone.in(input.getPipeline()); - } - - @Override - public void validate(PipelineOptions options) { - checkArgument(serializableConfiguration != null, "Configuration not specified"); - checkArgument(!tableId.isEmpty(), "Table ID not specified"); - try (Connection connection = - ConnectionFactory.createConnection(serializableConfiguration.get())) { - Admin admin = connection.getAdmin(); - checkArgument( - admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId); - } catch (IOException e) { - LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); - } - } + @Override + public PDone expand(PCollection input) { + checkArgument(serializableConfiguration != null, "withConfiguration() is required"); + checkArgument(tableId != null && !tableId.isEmpty(), "withTableId() is required"); + try (Connection connection = ConnectionFactory.createConnection( + serializableConfiguration.get())) { + Admin admin = connection.getAdmin(); + checkArgument(admin.tableExists(TableName.valueOf(tableId)), + "Table %s does not exist", tableId); + } catch (IOException e) { + LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); + } + input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration))); + return PDone.in(input.getPipeline()); + } @Override public void populateDisplayData(DisplayData.Builder builder) { http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java index e6f7ac4..73ba64b 100644 --- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java +++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java @@ -151,21 +151,21 @@ public class HBaseIOTest { public void testWriteValidationFailsMissingTable() { HBaseIO.Write write = HBaseIO.write().withConfiguration(conf); thrown.expect(IllegalArgumentException.class); - write.validate(null /* input */); + write.expand(null /* input */); } @Test public void testWriteValidationFailsMissingConfiguration() { HBaseIO.Write write = HBaseIO.write().withTableId("table"); thrown.expect(IllegalArgumentException.class); - write.validate(null /* input */); + write.expand(null /* input */); } /** Tests that when reading from a non-existent table, the read fails. */ @Test public void testReadingFailsTableDoesNotExist() throws Exception { final String table = "TEST-TABLE-INVALID"; - // Exception will be thrown by read.validate() when read is applied. + // Exception will be thrown by read.expand() when read is applied. thrown.expect(IllegalArgumentException.class); thrown.expectMessage(String.format("Table %s does not exist", table)); runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table), new ArrayList()); @@ -355,14 +355,14 @@ public class HBaseIOTest { public void testWritingFailsTableDoesNotExist() throws Exception { final String table = "TEST-TABLE-DOES-NOT-EXIST"; - p.apply(Create.empty(HBaseMutationCoder.of())) - .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); - // Exception will be thrown by write.validate() when write is applied. - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(String.format("Table %s does not exist", table)); - p.run(); - } + + // Exception will be thrown by write.expand() when write is applied. + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(String.format("Table %s does not exist", table)); + p.apply(Create.empty(HBaseMutationCoder.of())) + .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table)); + } /** Tests that when writing an element fails, the write fails. */ @Test http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java index d8e462b..8ff9a28 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.hcatalog; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -180,13 +179,10 @@ public class HCatalogIO { @Override public PCollection expand(PBegin input) { - return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this))); - } + checkArgument(getTable() != null, "withTable() is required"); + checkArgument(getConfigProperties() != null, "withConfigProperties() is required"); - @Override - public void validate(PipelineOptions options) { - checkNotNull(getTable(), "table"); - checkNotNull(getConfigProperties(), "configProperties"); + return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this))); } @Override @@ -215,11 +211,6 @@ public class HCatalogIO { } @Override - public void validate() { - spec.validate(null); - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { spec.populateDisplayData(builder); } @@ -396,16 +387,12 @@ public class HCatalogIO { @Override public PDone expand(PCollection input) { + checkArgument(getConfigProperties() != null, "withConfigProperties() is required"); + checkArgument(getTable() != null, "withTable() is required"); input.apply(ParDo.of(new WriteFn(this))); return PDone.in(input.getPipeline()); } - @Override - public void validate(PipelineOptions options) { - checkNotNull(getConfigProperties(), "configProperties"); - checkNotNull(getTable(), "table"); - } - private static class WriteFn extends DoFn { private final Write spec; private WriterContext writerContext; http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java index 91671a5..dc53c84 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java @@ -175,19 +175,19 @@ public class HCatalogIOTest implements Serializable { /** Test of Write without specifying a table. */ @Test public void testWriteFailureValidationTable() throws Exception { - thrown.expect(NullPointerException.class); - thrown.expectMessage(containsString("table")); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("withTable() is required"); HCatalogIO.write() .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) - .validate(null); + .expand(null); } /** Test of Write without specifying configuration properties. */ @Test public void testWriteFailureValidationConfigProp() throws Exception { - thrown.expect(NullPointerException.class); - thrown.expectMessage(containsString("configProperties")); - HCatalogIO.write().withTable("myowntable").validate(null); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("withConfigProperties() is required"); + HCatalogIO.write().withTable("myowntable").expand(null); } /** Test of Read from a non-existent table. */ @@ -204,19 +204,19 @@ public class HCatalogIOTest implements Serializable { /** Test of Read without specifying configuration properties. */ @Test public void testReadFailureValidationConfig() throws Exception { - thrown.expect(NullPointerException.class); - thrown.expectMessage(containsString("configProperties")); - HCatalogIO.read().withTable("myowntable").validate(null); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("withConfigProperties() is required"); + HCatalogIO.read().withTable("myowntable").expand(null); } /** Test of Read without specifying a table. */ @Test public void testReadFailureValidationTable() throws Exception { - thrown.expect(NullPointerException.class); - thrown.expectMessage(containsString("table")); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("withTable() is required"); HCatalogIO.read() .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) - .validate(null); + .expand(null); } /** Test of Read using SourceTestUtils.readFromSource(..). */ http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index dc30b2d..14b766e 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.jdbc; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import java.io.Serializable; @@ -31,7 +30,6 @@ import javax.annotation.Nullable; import javax.sql.DataSource; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -205,20 +203,16 @@ public class JdbcIO { } public static DataSourceConfiguration create(DataSource dataSource) { - checkArgument(dataSource != null, "DataSourceConfiguration.create(dataSource) called with " - + "null data source"); - checkArgument(dataSource instanceof Serializable, - "DataSourceConfiguration.create(dataSource) called with a dataSource not Serializable"); + checkArgument(dataSource != null, "dataSource can not be null"); + checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable"); return new AutoValue_JdbcIO_DataSourceConfiguration.Builder() .setDataSource(dataSource) .build(); } public static DataSourceConfiguration create(String driverClassName, String url) { - checkArgument(driverClassName != null, - "DataSourceConfiguration.create(driverClassName, url) called with null driverClassName"); - checkArgument(url != null, - "DataSourceConfiguration.create(driverClassName, url) called with null url"); + checkArgument(driverClassName != null, "driverClassName can not be null"); + checkArgument(url != null, "url can not be null"); return new AutoValue_JdbcIO_DataSourceConfiguration.Builder() .setDriverClassName(driverClassName) .setUrl(url) @@ -241,9 +235,7 @@ public class JdbcIO { * {@link #withPassword(String)}, so they do not need to be included here. */ public DataSourceConfiguration withConnectionProperties(String connectionProperties) { - checkArgument(connectionProperties != null, "DataSourceConfiguration.create(driver, url)" - + ".withConnectionProperties(connectionProperties) " - + "called with null connectionProperties"); + checkArgument(connectionProperties != null, "connectionProperties can not be null"); return builder().setConnectionProperties(connectionProperties).build(); } @@ -305,41 +297,43 @@ public class JdbcIO { } public Read withDataSourceConfiguration(DataSourceConfiguration configuration) { - checkArgument(configuration != null, "JdbcIO.read().withDataSourceConfiguration" - + "(configuration) called with null configuration"); + checkArgument(configuration != null, "configuration can not be null"); return toBuilder().setDataSourceConfiguration(configuration).build(); } public Read withQuery(String query) { - checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query"); + checkArgument(query != null, "query can not be null"); return withQuery(ValueProvider.StaticValueProvider.of(query)); } public Read withQuery(ValueProvider query) { - checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query"); + checkArgument(query != null, "query can not be null"); return toBuilder().setQuery(query).build(); } public Read withStatementPreparator(StatementPreparator statementPreparator) { - checkArgument(statementPreparator != null, - "JdbcIO.read().withStatementPreparator(statementPreparator) called " - + "with null statementPreparator"); + checkArgument(statementPreparator != null, "statementPreparator can not be null"); return toBuilder().setStatementPreparator(statementPreparator).build(); } public Read withRowMapper(RowMapper rowMapper) { - checkArgument(rowMapper != null, - "JdbcIO.read().withRowMapper(rowMapper) called with null rowMapper"); + checkArgument(rowMapper != null, "rowMapper can not be null"); return toBuilder().setRowMapper(rowMapper).build(); } public Read withCoder(Coder coder) { - checkArgument(coder != null, "JdbcIO.read().withCoder(coder) called with null coder"); + checkArgument(coder != null, "coder can not be null"); return toBuilder().setCoder(coder).build(); } @Override public PCollection expand(PBegin input) { + checkArgument(getQuery() != null, "withQuery() is required"); + checkArgument(getRowMapper() != null, "withRowMapper() is required"); + checkArgument(getCoder() != null, "withCoder() is required"); + checkArgument( + getDataSourceConfiguration() != null, "withDataSourceConfiguration() is required"); + return input .apply(Create.of((Void) null)) .apply( @@ -361,19 +355,6 @@ public class JdbcIO { } @Override - public void validate(PipelineOptions options) { - checkState(getQuery() != null, - "JdbcIO.read() requires a query to be set via withQuery(query)"); - checkState(getRowMapper() != null, - "JdbcIO.read() requires a rowMapper to be set via withRowMapper(rowMapper)"); - checkState(getCoder() != null, - "JdbcIO.read() requires a coder to be set via withCoder(coder)"); - checkState(getDataSourceConfiguration() != null, - "JdbcIO.read() requires a DataSource configuration to be set via " - + "withDataSourceConfiguration(dataSourceConfiguration)"); - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.add(DisplayData.item("query", getQuery())); @@ -460,19 +441,6 @@ public class JdbcIO { } @Override - public void validate(PipelineOptions options) { - checkState(getQuery() != null, - "JdbcIO.read() requires a query to be set via withQuery(query)"); - checkState(getRowMapper() != null, - "JdbcIO.read() requires a rowMapper to be set via withRowMapper(rowMapper)"); - checkState(getCoder() != null, - "JdbcIO.read() requires a coder to be set via withCoder(coder)"); - checkState(getDataSourceConfiguration() != null, - "JdbcIO.read() requires a DataSource configuration to be set via " - + "withDataSourceConfiguration(dataSourceConfiguration)"); - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.add(DisplayData.item("query", getQuery())); @@ -568,22 +536,16 @@ public class JdbcIO { @Override public PDone expand(PCollection input) { + checkArgument( + getDataSourceConfiguration() != null, "withDataSourceConfiguration() is required"); + checkArgument(getStatement() != null, "withStatement() is required"); + checkArgument( + getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required"); + input.apply(ParDo.of(new WriteFn(this))); return PDone.in(input.getPipeline()); } - @Override - public void validate(PipelineOptions options) { - checkArgument(getDataSourceConfiguration() != null, - "JdbcIO.write() requires a configuration to be set via " - + ".withDataSourceConfiguration(configuration)"); - checkArgument(getStatement() != null, - "JdbcIO.write() requires a statement to be set via .withStatement(statement)"); - checkArgument(getPreparedStatementSetter() != null, - "JdbcIO.write() requires a preparedStatementSetter to be set via " - + ".withPreparedStatementSetter(preparedStatementSetter)"); - } - private static class WriteFn extends DoFn { private static final int DEFAULT_BATCH_SIZE = 1000; http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index 2af0ce9..b3a9c8b 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.jms; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -165,8 +164,7 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Read withConnectionFactory(ConnectionFactory connectionFactory) { - checkArgument(connectionFactory != null, "withConnectionFactory(connectionFactory) called" - + " with null connectionFactory"); + checkArgument(connectionFactory != null, "connectionFactory can not be null"); return builder().setConnectionFactory(connectionFactory).build(); } @@ -189,7 +187,7 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Read withQueue(String queue) { - checkArgument(queue != null, "withQueue(queue) called with null queue"); + checkArgument(queue != null, "queue can not be null"); return builder().setQueue(queue).build(); } @@ -212,7 +210,7 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Read withTopic(String topic) { - checkArgument(topic != null, "withTopic(topic) called with null topic"); + checkArgument(topic != null, "topic can not be null"); return builder().setTopic(topic).build(); } @@ -220,8 +218,7 @@ public class JmsIO { * Define the username to connect to the JMS broker (authenticated). */ public Read withUsername(String username) { - checkArgument(username != null, "JmsIO.read().withUsername(username) called with null " - + "username"); + checkArgument(username != null, "username can not be null"); return builder().setUsername(username).build(); } @@ -229,8 +226,7 @@ public class JmsIO { * Define the password to connect to the JMS broker (authenticated). */ public Read withPassword(String password) { - checkArgument(password != null, "JmsIO.read().withPassword(password) called with null " - + "password"); + checkArgument(password != null, "password can not be null"); return builder().setPassword(password).build(); } @@ -251,8 +247,7 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Read withMaxNumRecords(long maxNumRecords) { - checkArgument(maxNumRecords >= 0, "withMaxNumRecords(maxNumRecords) called with invalid " - + "maxNumRecords"); + checkArgument(maxNumRecords >= 0, "maxNumRecords must be > 0, but was: %d", maxNumRecords); return builder().setMaxNumRecords(maxNumRecords).build(); } @@ -273,13 +268,20 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Read withMaxReadTime(Duration maxReadTime) { - checkArgument(maxReadTime != null, "withMaxReadTime(maxReadTime) called with null " - + "maxReadTime"); + checkArgument(maxReadTime != null, "maxReadTime can not be null"); return builder().setMaxReadTime(maxReadTime).build(); } @Override public PCollection expand(PBegin input) { + checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required"); + checkArgument( + getQueue() != null || getTopic() != null, + "Either withQueue() or withTopic() is required"); + checkArgument( + getQueue() == null || getTopic() == null, + "withQueue() and withTopic() are exclusive"); + // handles unbounded source to bounded conversion if maxNumRecords is set. Unbounded unbounded = org.apache.beam.sdk.io.Read.from(createSource()); @@ -295,15 +297,6 @@ public class JmsIO { } @Override - public void validate(PipelineOptions options) { - checkState(getConnectionFactory() != null, "JmsIO.read() requires a JMS connection " - + "factory to be set via withConnectionFactory(connectionFactory)"); - checkState((getQueue() != null || getTopic() != null), "JmsIO.read() requires a JMS " - + "destination (queue or topic) to be set via withQueue(queueName) or withTopic" - + "(topicName)"); - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.addIfNotNull(DisplayData.item("queue", getQueue())); @@ -363,11 +356,6 @@ public class JmsIO { } @Override - public void validate() { - spec.validate(null); - } - - @Override public Coder getCheckpointMarkCoder() { return AvroCoder.of(JmsCheckpointMark.class); } @@ -579,8 +567,7 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Write withConnectionFactory(ConnectionFactory connectionFactory) { - checkArgument(connectionFactory != null, "withConnectionFactory(connectionFactory) called" - + " with null connectionFactory"); + checkArgument(connectionFactory != null, "connectionFactory can not be null"); return builder().setConnectionFactory(connectionFactory).build(); } @@ -603,7 +590,7 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Write withQueue(String queue) { - checkArgument(queue != null, "withQueue(queue) called with null queue"); + checkArgument(queue != null, "queue can not be null"); return builder().setQueue(queue).build(); } @@ -626,7 +613,7 @@ public class JmsIO { * @return The corresponding {@link JmsIO.Read}. */ public Write withTopic(String topic) { - checkArgument(topic != null, "withTopic(topic) called with null topic"); + checkArgument(topic != null, "topic can not be null"); return builder().setTopic(topic).build(); } @@ -634,8 +621,7 @@ public class JmsIO { * Define the username to connect to the JMS broker (authenticated). */ public Write withUsername(String username) { - checkArgument(username != null, "JmsIO.write().withUsername(username) called with null " - + "username"); + checkArgument(username != null, "username can not be null"); return builder().setUsername(username).build(); } @@ -643,25 +629,24 @@ public class JmsIO { * Define the password to connect to the JMS broker (authenticated). */ public Write withPassword(String password) { - checkArgument(password != null, "JmsIO.write().withPassword(password) called with null " - + "password"); + checkArgument(password != null, "password can not be null"); return builder().setPassword(password).build(); } @Override public PDone expand(PCollection input) { + checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required"); + checkArgument( + getQueue() != null || getTopic() != null, + "Either withQueue(queue) or withTopic(topic) is required"); + checkArgument( + getQueue() == null || getTopic() == null, + "withQueue(queue) and withTopic(topic) are exclusive"); + input.apply(ParDo.of(new WriterFn(this))); return PDone.in(input.getPipeline()); } - @Override - public void validate(PipelineOptions options) { - checkState(getConnectionFactory() != null, "JmsIO.write() requires a JMS connection " - + "factory to be set via withConnectionFactory(connectionFactory)"); - checkState((getQueue() != null || getTopic() != null), "JmsIO.write() requires a JMS " - + "destination (queue or topic) to be set via withQueue(queue) or withTopic(topic)"); - } - private static class WriterFn extends DoFn { private Write spec; http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index dae4c1d..aab99c3 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -487,7 +487,7 @@ public class KafkaIO { */ public Read withTimestampFn2( SerializableFunction, Instant> timestampFn) { - checkNotNull(timestampFn); + checkArgument(timestampFn != null, "timestampFn can not be null"); return toBuilder().setTimestampFn(timestampFn).build(); } @@ -497,7 +497,7 @@ public class KafkaIO { */ public Read withWatermarkFn2( SerializableFunction, Instant> watermarkFn) { - checkNotNull(watermarkFn); + checkArgument(watermarkFn != null, "watermarkFn can not be null"); return toBuilder().setWatermarkFn(watermarkFn).build(); } @@ -505,7 +505,7 @@ public class KafkaIO { * A function to assign a timestamp to a record. Default is processing timestamp. */ public Read withTimestampFn(SerializableFunction, Instant> timestampFn) { - checkNotNull(timestampFn); + checkArgument(timestampFn != null, "timestampFn can not be null"); return withTimestampFn2(unwrapKafkaAndThen(timestampFn)); } @@ -514,7 +514,7 @@ public class KafkaIO { * @see #withTimestampFn(SerializableFunction) */ public Read withWatermarkFn(SerializableFunction, Instant> watermarkFn) { - checkNotNull(watermarkFn); + checkArgument(watermarkFn != null, "watermarkFn can not be null"); return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn)); } @@ -526,13 +526,14 @@ public class KafkaIO { } @Override - public void validate(PipelineOptions options) { - checkNotNull(getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), - "Kafka bootstrap servers should be set"); + public PCollection> expand(PBegin input) { + checkArgument( + getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) != null, + "withBootstrapServers() is required"); checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0, - "Kafka topics or topic_partitions are required"); - checkNotNull(getKeyDeserializer(), "Key deserializer must be set"); - checkNotNull(getValueDeserializer(), "Value deserializer must be set"); + "Either withTopic(), withTopics() or withTopicPartitions() is required"); + checkArgument(getKeyDeserializer() != null, "withKeyDeserializer() is required"); + checkArgument(getValueDeserializer() != null, "withValueDeserializer() is required"); if (getStartReadTime() != null) { checkArgument(new ConsumerSpEL().hasOffsetsForTimes(), "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, " @@ -540,25 +541,23 @@ public class KafkaIO { + ". If you are building with maven, set \"kafka.clients.version\" " + "maven property to 0.10.1.0 or newer."); } - } - @Override - public PCollection> expand(PBegin input) { // Infer key/value coders if not specified explicitly CoderRegistry registry = input.getPipeline().getCoderRegistry(); Coder keyCoder = - checkNotNull( - getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()), - "Key coder could not be inferred from key deserializer. Please provide" - + "key coder explicitly using withKeyDeserializerAndCoder()"); + getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()); + checkArgument( + keyCoder != null, + "Key coder could not be inferred from key deserializer. Please provide" + + "key coder explicitly using withKeyDeserializerAndCoder()"); Coder valueCoder = - checkNotNull( - getValueCoder() != null ? getValueCoder() - : inferCoder(registry, getValueDeserializer()), - "Value coder could not be inferred from value deserializer. Please provide" - + "value coder explicitly using withValueDeserializerAndCoder()"); + getValueCoder() != null ? getValueCoder() : inferCoder(registry, getValueDeserializer()); + checkArgument( + valueCoder != null, + "Value coder could not be inferred from value deserializer. Please provide" + + "value coder explicitly using withValueDeserializerAndCoder()"); // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. Unbounded> unbounded = @@ -840,11 +839,6 @@ public class KafkaIO { } @Override - public void validate() { - spec.validate(null); - } - - @Override public Coder> getOutputCoder() { return KafkaRecordCoder.of(spec.getKeyCoder(), spec.getValueCoder()); } @@ -1488,17 +1482,15 @@ public class KafkaIO { @Override public PDone expand(PCollection> input) { + checkArgument( + getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null, + "withBootstrapServers() is required"); + checkArgument(getTopic() != null, "withTopic() is required"); + input.apply(ParDo.of(new KafkaWriter<>(this))); return PDone.in(input.getPipeline()); } - @Override - public void validate(PipelineOptions options) { - checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), - "Kafka bootstrap servers should be set"); - checkNotNull(getTopic(), "Kafka topic should be set"); - } - // set config defaults private static final Map DEFAULT_PRODUCER_PROPERTIES = ImmutableMap.of( http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java index bc8ada1..ef39a91 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.kinesis; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; @@ -151,8 +150,7 @@ public final class KinesisIO { public Read from(String streamName, InitialPositionInStream initialPosition) { return toBuilder() .setStreamName(streamName) - .setInitialPosition( - new StartingPoint(checkNotNull(initialPosition, "initialPosition"))) + .setInitialPosition(new StartingPoint(initialPosition)) .build(); } @@ -163,8 +161,7 @@ public final class KinesisIO { public Read from(String streamName, Instant initialTimestamp) { return toBuilder() .setStreamName(streamName) - .setInitialPosition( - new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp"))) + .setInitialPosition(new StartingPoint(initialTimestamp)) .build(); } @@ -197,7 +194,7 @@ public final class KinesisIO { /** Specifies to read at most a given number of records. */ public Read withMaxReadTime(Duration maxReadTime) { - checkNotNull(maxReadTime, "maxReadTime"); + checkArgument(maxReadTime != null, "maxReadTime can not be null"); return toBuilder().setMaxReadTime(maxReadTime).build(); } @@ -226,9 +223,12 @@ public final class KinesisIO { private final Regions region; private BasicKinesisProvider(String accessKey, String secretKey, Regions region) { - this.accessKey = checkNotNull(accessKey, "accessKey"); - this.secretKey = checkNotNull(secretKey, "secretKey"); - this.region = checkNotNull(region, "region"); + checkArgument(accessKey != null, "accessKey can not be null"); + checkArgument(secretKey != null, "secretKey can not be null"); + checkArgument(region != null, "region can not be null"); + this.accessKey = accessKey; + this.secretKey = secretKey; + this.region = region; } private AWSCredentialsProvider getCredentialsProvider() { http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java index e5c32d2..e2fa474 100644 --- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java +++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java @@ -23,7 +23,6 @@ import static com.google.common.collect.Lists.newArrayList; import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; - import org.apache.beam.sdk.io.UnboundedSource; import org.joda.time.Instant; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index c612d52..4dc2405 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -431,10 +431,6 @@ public class MongoDbGridFSIO { } @Override - public void validate() { - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { spec.populateDisplayData(builder); } http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index d29f0ae..9007051 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.io.mongodb; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -205,8 +204,7 @@ public class MongoDbIO { * Sets the database to use. */ public Read withDatabase(String database) { - checkArgument(database != null, "MongoDbIO.read().withDatabase(database) called with null" - + " database"); + checkArgument(database != null, "database can not be null"); return builder().setDatabase(database).build(); } @@ -214,8 +212,7 @@ public class MongoDbIO { * Sets the collection to consider in the database. */ public Read withCollection(String collection) { - checkArgument(collection != null, "MongoDbIO.read().withCollection(collection) called " - + "with null collection"); + checkArgument(collection != null, "collection can not be null"); return builder().setCollection(collection).build(); } @@ -223,8 +220,7 @@ public class MongoDbIO { * Sets a filter on the documents in a collection. */ public Read withFilter(String filter) { - checkArgument(filter != null, "MongoDbIO.read().withFilter(filter) called with null " - + "filter"); + checkArgument(filter != null, "filter can not be null"); return builder().setFilter(filter).build(); } @@ -232,27 +228,19 @@ public class MongoDbIO { * Sets the user defined number of splits. */ public Read withNumSplits(int numSplits) { - checkArgument(numSplits >= 0, "MongoDbIO.read().withNumSplits(numSplits) called with " - + "invalid number. The number of splits has to be a positive value (currently %d)", - numSplits); + checkArgument(numSplits >= 0, "invalid num_splits: must be >= 0, but was %d", numSplits); return builder().setNumSplits(numSplits).build(); } @Override public PCollection expand(PBegin input) { + checkArgument(uri() != null, "withUri() is required"); + checkArgument(database() != null, "withDatabase() is required"); + checkArgument(collection() != null, "withCollection() is required"); return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedMongoDbSource(this))); } @Override - public void validate(PipelineOptions options) { - checkState(uri() != null, "MongoDbIO.read() requires an URI to be set via withUri(uri)"); - checkState(database() != null, "MongoDbIO.read() requires a database to be set via " - + "withDatabase(database)"); - checkState(collection() != null, "MongoDbIO.read() requires a collection to be set via " - + "withCollection(collection)"); - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.add(DisplayData.item("uri", uri())); @@ -282,11 +270,6 @@ public class MongoDbIO { } @Override - public void validate() { - spec.validate(null); - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { spec.populateDisplayData(builder); } @@ -576,7 +559,7 @@ public class MongoDbIO { * */ public Write withUri(String uri) { - checkArgument(uri != null, "MongoDbIO.write().withUri(uri) called with null uri"); + checkArgument(uri != null, "uri can not be null"); return builder().setUri(uri).build(); } @@ -598,8 +581,7 @@ public class MongoDbIO { * Sets the database to use. */ public Write withDatabase(String database) { - checkArgument(database != null, "MongoDbIO.write().withDatabase(database) called with " - + "null database"); + checkArgument(database != null, "database can not be null"); return builder().setDatabase(database).build(); } @@ -607,8 +589,7 @@ public class MongoDbIO { * Sets the collection where to write data in the database. */ public Write withCollection(String collection) { - checkArgument(collection != null, "MongoDbIO.write().withCollection(collection) called " - + "with null collection"); + checkArgument(collection != null, "collection can not be null"); return builder().setCollection(collection).build(); } @@ -616,27 +597,21 @@ public class MongoDbIO { * Define the size of the batch to group write operations. */ public Write withBatchSize(long batchSize) { - checkArgument(batchSize >= 0, "MongoDbIO.write().withBatchSize(batchSize) called with " - + "invalid batch size. Batch size has to be >= 0 (currently %d)", batchSize); + checkArgument(batchSize >= 0, "Batch size must be >= 0, but was %d", batchSize); return builder().setBatchSize(batchSize).build(); } @Override public PDone expand(PCollection input) { + checkArgument(uri() != null, "withUri() is required"); + checkArgument(database() != null, "withDatabase() is required"); + checkArgument(collection() != null, "withCollection() is required"); + input.apply(ParDo.of(new WriteFn(this))); return PDone.in(input.getPipeline()); } @Override - public void validate(PipelineOptions options) { - checkState(uri() != null, "MongoDbIO.write() requires an URI to be set via withUri(uri)"); - checkState(database() != null, "MongoDbIO.write() requires a database to be set via " - + "withDatabase(database)"); - checkState(collection() != null, "MongoDbIO.write() requires a collection to be set via " - + "withCollection(collection)"); - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { builder.add(DisplayData.item("uri", uri())); builder.add(DisplayData.item("keepAlive", keepAlive())); http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index 5aadb80..f9083bb 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -149,12 +149,8 @@ public class MqttIO { * @return A connection configuration to the MQTT broker. */ public static ConnectionConfiguration create(String serverUri, String topic) { - checkArgument(serverUri != null, - "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null " - + "serverUri"); - checkArgument(topic != null, - "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null " - + "topic"); + checkArgument(serverUri != null, "serverUri can not be null"); + checkArgument(topic != null, "topic can not be null"); return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri) .setTopic(topic).build(); } @@ -168,14 +164,9 @@ public class MqttIO { * @return A connection configuration to the MQTT broker. */ public static ConnectionConfiguration create(String serverUri, String topic, String clientId) { - checkArgument(serverUri != null, - "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null " - + "serverUri"); - checkArgument(topic != null, - "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null " - + "topic"); - checkArgument(clientId != null, "MqttIO.ConnectionConfiguration.create(serverUri," - + "topic, clientId) called with null clientId"); + checkArgument(serverUri != null, "serverUri can not be null"); + checkArgument(topic != null, "topic can not be null"); + checkArgument(clientId != null, "clientId can not be null"); return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri) .setTopic(topic).setClientId(clientId).build(); } @@ -242,9 +233,7 @@ public class MqttIO { * Define the MQTT connection configuration used to connect to the MQTT broker. */ public Read withConnectionConfiguration(ConnectionConfiguration configuration) { - checkArgument(configuration != null, - "MqttIO.read().withConnectionConfiguration(configuration) called with null " - + "configuration or not called at all"); + checkArgument(configuration != null, "configuration can not be null"); return builder().setConnectionConfiguration(configuration).build(); } @@ -254,8 +243,6 @@ public class MqttIO { * will provide a bounded {@link PCollection}. */ public Read withMaxNumRecords(long maxNumRecords) { - checkArgument(maxReadTime() == null, - "maxNumRecord and maxReadTime are exclusive"); return builder().setMaxNumRecords(maxNumRecords).build(); } @@ -265,13 +252,14 @@ public class MqttIO { * {@link PCollection}. */ public Read withMaxReadTime(Duration maxReadTime) { - checkArgument(maxNumRecords() == Long.MAX_VALUE, - "maxNumRecord and maxReadTime are exclusive"); return builder().setMaxReadTime(maxReadTime).build(); } @Override public PCollection expand(PBegin input) { + checkArgument( + maxReadTime() == null || maxNumRecords() == Long.MAX_VALUE, + "withMaxNumRecords() and withMaxReadTime() are exclusive"); org.apache.beam.sdk.io.Read.Unbounded unbounded = org.apache.beam.sdk.io.Read.from(new UnboundedMqttSource(this)); @@ -288,11 +276,6 @@ public class MqttIO { } @Override - public void validate(PipelineOptions options) { - // validation is performed in the ConnectionConfiguration create() - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); connectionConfiguration().populateDisplayData(builder); @@ -372,11 +355,6 @@ public class MqttIO { } @Override - public void validate() { - spec.validate(null); - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { spec.populateDisplayData(builder); } @@ -511,9 +489,7 @@ public class MqttIO { * Define MQTT connection configuration used to connect to the MQTT broker. */ public Write withConnectionConfiguration(ConnectionConfiguration configuration) { - checkArgument(configuration != null, - "MqttIO.write().withConnectionConfiguration(configuration) called with null " - + "configuration or not called at all"); + checkArgument(configuration != null, "configuration can not be null"); return builder().setConnectionConfiguration(configuration).build(); } @@ -538,11 +514,6 @@ public class MqttIO { } @Override - public void validate(PipelineOptions options) { - // validate is done in connection configuration - } - - @Override public void populateDisplayData(DisplayData.Builder builder) { connectionConfiguration().populateDisplayData(builder); builder.add(DisplayData.item("retained", retained())); http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java index c137eea..f811139 100644 --- a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java +++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java @@ -284,15 +284,11 @@ public class SolrIO { @Override public PCollection expand(PBegin input) { - return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedSolrSource(this, null))); - } + checkArgument( + getConnectionConfiguration() != null, "withConnectionConfiguration() is required"); + checkArgument(getCollection() != null, "from() is required"); - @Override - public void validate(PipelineOptions options) { - checkState( - getConnectionConfiguration() != null, - "Need to set connection configuration using withConnectionConfiguration()"); - checkState(getCollection() != null, "Need to set collection name using to()"); + return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedSolrSource(this, null))); } @Override @@ -464,11 +460,6 @@ public class SolrIO { } @Override - public void validate() { - spec.validate(null); - } - - @Override public Coder getOutputCoder() { return JavaBinCodecCoder.of(SolrDocument.class); } @@ -642,15 +633,12 @@ public class SolrIO { } @Override - public void validate(PipelineOptions options) { + public PDone expand(PCollection input) { checkState( getConnectionConfiguration() != null, - "Need to set connection configuration via withConnectionConfiguration()"); - checkState(getCollection() != null, "Need to set collection name via to()"); - } + "withConnectionConfiguration() is required"); + checkState(getCollection() != null, "to() is required"); - @Override - public PDone expand(PCollection input) { input.apply(ParDo.of(new WriteFn(this))); return PDone.in(input.getPipeline()); } http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java index 749da51..47626cd 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.xml; -import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkArgument; import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; @@ -36,7 +36,6 @@ import org.apache.beam.sdk.io.FileIO.ReadableFile; import org.apache.beam.sdk.io.OffsetBasedSource; import org.apache.beam.sdk.io.ReadAllViaFileBasedSource; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.transforms.PTransform; @@ -288,18 +287,10 @@ public class XmlIO { } private void validate() { - checkNotNull( - getRootElement(), - "rootElement is null. Use builder method withRootElement() to set this."); - checkNotNull( - getRecordElement(), - "recordElement is null. Use builder method withRecordElement() to set this."); - checkNotNull( - getRecordClass(), - "recordClass is null. Use builder method withRecordClass() to set this."); - checkNotNull( - getCharset(), - "charset is null. Use builder method withCharset() to set this."); + checkArgument(getRootElement() != null, "withRootElement() is required"); + checkArgument(getRecordElement() != null, "withRecordElement() is required"); + checkArgument(getRecordClass() != null, "withRecordClass() is required"); + checkArgument(getCharset() != null, "withCharset() is required"); } @Override @@ -595,20 +586,17 @@ public class XmlIO { } @Override - public void validate(PipelineOptions options) { - checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB context."); - checkNotNull(getRootElement(), "Missing a root element name."); - checkNotNull(getFilenamePrefix(), "Missing a filename to write to."); - checkNotNull(getCharset(), "Missing charset"); + public PDone expand(PCollection input) { + checkArgument(getRecordClass() != null, "withRecordClass() is required"); + checkArgument(getRootElement() != null, "withRootElement() is required"); + checkArgument(getFilenamePrefix() != null, "to() is required"); + checkArgument(getCharset() != null, "withCharset() is required"); try { JAXBContext.newInstance(getRecordClass()); } catch (JAXBException e) { throw new RuntimeException("Error binding classes to a JAXB Context.", e); } - } - @Override - public PDone expand(PCollection input) { input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink())); return PDone.in(input.getPipeline()); } http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java index b663544..b54d95b 100644 --- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java +++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; import org.apache.beam.sdk.io.ShardNameTemplate; import org.apache.beam.sdk.io.fs.ResourceId; -import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.MimeTypes; @@ -51,15 +50,6 @@ class XmlSink extends FileBasedSink { } /** - * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have - * been set and that the class can be bound in a JAXB context. - */ - @Override - public void validate(PipelineOptions options) { - spec.validate(null); - } - - /** * Creates an {@link XmlWriteOperation}. */ @Override http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java index d1584dc..3834abd 100644 --- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java +++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java @@ -127,25 +127,25 @@ public class XmlSinkTest { /** Validation ensures no fields are missing. */ @Test public void testValidateXmlSinkMissingRecordClass() { - thrown.expect(NullPointerException.class); + thrown.expect(IllegalArgumentException.class); XmlIO.write() .to(testFilePrefix) .withRootElement(testRootElement) - .validate(null); + .expand(null); } @Test public void testValidateXmlSinkMissingRootElement() { - thrown.expect(NullPointerException.class); + thrown.expect(IllegalArgumentException.class); XmlIO.write().withRecordClass(Bird.class) .to(testFilePrefix) - .validate(null); + .expand(null); } @Test public void testValidateXmlSinkMissingOutputDirectory() { - thrown.expect(NullPointerException.class); - XmlIO.write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null); + thrown.expect(IllegalArgumentException.class); + XmlIO.write().withRecordClass(Bird.class).withRootElement(testRootElement).expand(null); } /**