beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] beam git commit: Cleans up PTransform validation across Beam
Date Mon, 11 Sep 2017 22:38:17 GMT
Repository: beam
Updated Branches:
  refs/heads/master 98b0fd2b4 -> e8bf045f6


http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
index 6eb1a33..ad4e47b 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
@@ -84,75 +84,6 @@ public class SpannerIOReadTest implements Serializable {
   }
 
   @Test
-  public void emptyTransform() throws Exception {
-    SpannerIO.Read read = SpannerIO.read();
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires instance id to be set with");
-    read.validate(null);
-  }
-
-  @Test
-  public void emptyInstanceId() throws Exception {
-    SpannerIO.Read read = SpannerIO.read().withDatabaseId("123");
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires instance id to be set with");
-    read.validate(null);
-  }
-
-  @Test
-  public void emptyDatabaseId() throws Exception {
-    SpannerIO.Read read = SpannerIO.read().withInstanceId("123");
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires database id to be set with");
-    read.validate(null);
-  }
-
-  @Test
-  public void emptyQuery() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read().withInstanceId("123").withDatabaseId("aaa").withTimestamp(Timestamp.now());
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("requires configuring query or read operation");
-    read.validate(null);
-  }
-
-  @Test
-  public void emptyColumns() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read()
-            .withInstanceId("123")
-            .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
-            .withTable("users");
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("requires a list of columns");
-    read.validate(null);
-  }
-
-  @Test
-  public void validRead() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read()
-            .withInstanceId("123")
-            .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
-            .withTable("users")
-            .withColumns("id", "name", "email");
-    read.validate(null);
-  }
-
-  @Test
-  public void validQuery() throws Exception {
-    SpannerIO.Read read =
-        SpannerIO.read()
-            .withInstanceId("123")
-            .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
-            .withQuery("SELECT * FROM users");
-    read.validate(null);
-  }
-
-  @Test
   public void runQuery() throws Exception {
     SpannerIO.Read read =
         SpannerIO.read()

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
index 09cdb8e..53783d1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
@@ -66,7 +66,7 @@ public class SpannerIOWriteTest implements Serializable {
     SpannerIO.Write write = SpannerIO.write();
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("requires instance id to be set with");
-    write.validate(null);
+    write.expand(null);
   }
 
   @Test
@@ -74,7 +74,7 @@ public class SpannerIOWriteTest implements Serializable {
     SpannerIO.Write write = SpannerIO.write().withDatabaseId("123");
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("requires instance id to be set with");
-    write.validate(null);
+    write.expand(null);
   }
 
   @Test
@@ -82,7 +82,7 @@ public class SpannerIOWriteTest implements Serializable {
     SpannerIO.Write write = SpannerIO.write().withInstanceId("123");
     thrown.expect(NullPointerException.class);
     thrown.expectMessage("requires database id to be set with");
-    write.validate(null);
+    write.expand(null);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hadoop/input-format/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/pom.xml b/sdks/java/io/hadoop/input-format/pom.xml
index 0953119..910a009 100644
--- a/sdks/java/io/hadoop/input-format/pom.xml
+++ b/sdks/java/io/hadoop/input-format/pom.xml
@@ -27,15 +27,10 @@
   <name>Apache Beam :: SDKs :: Java :: IO :: Hadoop :: input-format</name>
   <description>IO to read data from data sources which implement Hadoop Input Format.</description>
 
-  <properties>
-    <guava.version>19.0</guava.version>
-  </properties>
-
   <dependencies>
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>${guava.version}</version>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
index 20ca50a..89df555 100644
--- a/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
+++ b/sdks/java/io/hadoop/input-format/src/main/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIO.java
@@ -15,7 +15,6 @@
 package org.apache.beam.sdk.io.hadoop.inputformat;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -219,12 +218,7 @@ public class HadoopInputFormatIO {
       abstract Read<K, V> build();
     }
 
-    /**
-     * Returns a new {@link HadoopInputFormatIO.Read} that will read from the source using the
-     * options provided by the given configuration.
-     *
-     * <p>Does not modify this object.
-     */
+    /** Reads from the source using the options provided by the given configuration. */
     public Read<K, V> withConfiguration(Configuration configuration) {
       validateConfiguration(configuration);
       TypeDescriptor<?> inputFormatClass =
@@ -255,27 +249,17 @@ public class HadoopInputFormatIO {
       return builder.build();
     }
 
-    /**
-     * Returns a new {@link HadoopInputFormatIO.Read} that will transform the keys read from the
-     * source using the given key translation function.
-     *
-     * <p>Does not modify this object.
-     */
+    /** Transforms the keys read from the source using the given key translation function. */
     public Read<K, V> withKeyTranslation(SimpleFunction<?, K> function) {
-      checkNotNull(function, "function");
+      checkArgument(function != null, "function can not be null");
       // Sets key class to key translation function's output class type.
       return toBuilder().setKeyTranslationFunction(function)
           .setKeyTypeDescriptor((TypeDescriptor<K>) function.getOutputTypeDescriptor()).build();
     }
 
-    /**
-     * Returns a new {@link HadoopInputFormatIO.Read} that will transform the values read from the
-     * source using the given value translation function.
-     *
-     * <p>Does not modify this object.
-     */
+    /** Transforms the values read from the source using the given value translation function. */
     public Read<K, V> withValueTranslation(SimpleFunction<?, V> function) {
-      checkNotNull(function, "function");
+      checkArgument(function != null, "function can not be null");
       // Sets value class to value translation function's output class type.
       return toBuilder().setValueTranslationFunction(function)
           .setValueTypeDescriptor((TypeDescriptor<V>) function.getOutputTypeDescriptor()).build();
@@ -302,12 +286,14 @@ public class HadoopInputFormatIO {
      * key and value classes are provided in the Hadoop configuration.
      */
     private void validateConfiguration(Configuration configuration) {
-      checkNotNull(configuration, "configuration");
-      checkNotNull(configuration.get("mapreduce.job.inputformat.class"),
-          "configuration.get(\"mapreduce.job.inputformat.class\")");
-      checkNotNull(configuration.get("key.class"), "configuration.get(\"key.class\")");
-      checkNotNull(configuration.get("value.class"),
-          "configuration.get(\"value.class\")");
+      checkArgument(configuration != null, "configuration can not be null");
+      checkArgument(
+          configuration.get("mapreduce.job.inputformat.class") != null,
+          "Configuration must contain \"mapreduce.job.inputformat.class\"");
+      checkArgument(
+          configuration.get("key.class") != null, "configuration must contain \"key.class\"");
+      checkArgument(
+          configuration.get("value.class") != null, "configuration must contain \"value.class\"");
     }
 
     /**
@@ -315,7 +301,7 @@ public class HadoopInputFormatIO {
      */
     @VisibleForTesting
     void validateTransform() {
-      checkNotNull(getConfiguration(), "getConfiguration()");
+      checkArgument(getConfiguration() != null, "withConfiguration() is required");
       // Validate that the key translation input type must be same as key class of InputFormat.
       validateTranslationFunction(getinputFormatKeyClass(), getKeyTranslationFunction(),
           "Key translation's input type is not same as hadoop InputFormat : %s key class : %s");
@@ -422,9 +408,9 @@ public class HadoopInputFormatIO {
 
     @Override
     public void validate() {
-      checkNotNull(conf, "conf");
-      checkNotNull(keyCoder, "keyCoder");
-      checkNotNull(valueCoder, "valueCoder");
+      checkArgument(conf != null, "conf can not be null");
+      checkArgument(keyCoder != null, "keyCoder can not be null");
+      checkArgument(valueCoder != null, "valueCoder can not be null");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
index 9ec3838..a474744 100644
--- a/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
+++ b/sdks/java/io/hadoop/input-format/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HadoopInputFormatIOTest.java
@@ -161,7 +161,7 @@ public class HadoopInputFormatIOTest {
    */
   @Test
   public void testReadObjectCreationFailsIfConfigurationIsNull() {
-    thrown.expect(NullPointerException.class);
+    thrown.expect(IllegalArgumentException.class);
     HadoopInputFormatIO.<Text, Employee>read()
           .withConfiguration(null);
   }
@@ -192,7 +192,7 @@ public class HadoopInputFormatIOTest {
    */
   @Test
   public void testReadObjectCreationFailsIfKeyTranslationFunctionIsNull() {
-    thrown.expect(NullPointerException.class);
+    thrown.expect(IllegalArgumentException.class);
     HadoopInputFormatIO.<String, Employee>read()
         .withConfiguration(serConf.get())
         .withKeyTranslation(null);
@@ -225,7 +225,7 @@ public class HadoopInputFormatIOTest {
    */
   @Test
   public void testReadObjectCreationFailsIfValueTranslationFunctionIsNull() {
-    thrown.expect(NullPointerException.class);
+    thrown.expect(IllegalArgumentException.class);
     HadoopInputFormatIO.<Text, String>read()
         .withConfiguration(serConf.get())
         .withValueTranslation(null);
@@ -278,7 +278,7 @@ public class HadoopInputFormatIOTest {
   @Test
   public void testReadValidationFailsMissingConfiguration() {
     HadoopInputFormatIO.Read<String, String> read = HadoopInputFormatIO.<String, String>read();
-    thrown.expect(NullPointerException.class);
+    thrown.expect(IllegalArgumentException.class);
     read.validateTransform();
   }
 
@@ -292,7 +292,7 @@ public class HadoopInputFormatIOTest {
     Configuration configuration = new Configuration();
     configuration.setClass("key.class", Text.class, Object.class);
     configuration.setClass("value.class", Employee.class, Object.class);
-    thrown.expect(NullPointerException.class);
+    thrown.expect(IllegalArgumentException.class);
     HadoopInputFormatIO.<Text, Employee>read()
         .withConfiguration(configuration);
   }
@@ -307,7 +307,7 @@ public class HadoopInputFormatIOTest {
     configuration.setClass("mapreduce.job.inputformat.class", EmployeeInputFormat.class,
         InputFormat.class);
     configuration.setClass("value.class", Employee.class, Object.class);
-    thrown.expect(NullPointerException.class);
+    thrown.expect(IllegalArgumentException.class);
     HadoopInputFormatIO.<Text, Employee>read()
         .withConfiguration(configuration);
   }
@@ -322,7 +322,7 @@ public class HadoopInputFormatIOTest {
     configuration.setClass("mapreduce.job.inputformat.class", EmployeeInputFormat.class,
         InputFormat.class);
     configuration.setClass("key.class", Text.class, Object.class);
-    thrown.expect(NullPointerException.class);
+    thrown.expect(IllegalArgumentException.class);
     HadoopInputFormatIO.<Text, Employee>read().withConfiguration(configuration);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 41ced93..393402a 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -159,78 +159,69 @@ public class HBaseIO {
     return new Read(null, "", new SerializableScan(new Scan()));
   }
 
-  /**
-   * A {@link PTransform} that reads from HBase. See the class-level Javadoc on {@link HBaseIO} for
-   * more information.
-   *
-   * @see HBaseIO
-   */
-  public static class Read extends PTransform<PBegin, PCollection<Result>> {
-    /**
-     * Returns a new {@link HBaseIO.Read} that will read from the HBase instance indicated by the
-     * given configuration.
-     */
-    public Read withConfiguration(Configuration configuration) {
-      checkNotNull(configuration, "conf");
-      return new Read(new SerializableConfiguration(configuration), tableId, serializableScan);
-    }
-
     /**
-     * Returns a new {@link HBaseIO.Read} that will read from the specified table.
+     * A {@link PTransform} that reads from HBase. See the class-level Javadoc on
+      {@link HBaseIO} for* more information.
      *
-     * <p>Does not modify this object.
+     * @see HBaseIO
      */
-    public Read withTableId(String tableId) {
-      checkNotNull(tableId, "tableId");
-      return new Read(serializableConfiguration, tableId, serializableScan);
-    }
+    public static class Read extends PTransform<PBegin, PCollection<Result>> {
+        /**
+         Reads from the HBase instance
+          indicated by the* given configuration.*/
+
+        public Read withConfiguration(Configuration configuration) {
+            checkArgument(configuration != null, "configuration can not be null");
+            return new Read(new SerializableConfiguration(configuration),
+                    tableId, serializableScan);
+        }
 
-    /**
-     * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase using the given
-     * scan.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withScan(Scan scan) {
-      checkNotNull(scan, "scan");
-      return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
-    }
+        /**
+         Reads from the specified table.*/
 
-    /**
-     * Returns a new {@link HBaseIO.Read} that will filter the rows read from HBase using the given
-     * row filter.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withFilter(Filter filter) {
-      checkNotNull(filter, "filter");
-      return withScan(serializableScan.get().setFilter(filter));
-    }
+        public Read withTableId(String tableId) {
+            checkArgument(tableId != null, "tableIdcan not be null");
+            return new Read(serializableConfiguration, tableId, serializableScan);
+        }
 
-    /**
-     * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withKeyRange(ByteKeyRange keyRange) {
-      checkNotNull(keyRange, "keyRange");
-      byte[] startRow = keyRange.getStartKey().getBytes();
-      byte[] stopRow = keyRange.getEndKey().getBytes();
-      return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
-    }
+        /**
+         Filters the rows read from HBase
+          using the given* scan.*/
 
-    /**
-     * Returns a new {@link HBaseIO.Read} that will read only rows in the specified range.
-     *
-     * <p>Does not modify this object.
-     */
-    public Read withKeyRange(byte[] startRow, byte[] stopRow) {
-      checkNotNull(startRow, "startRow");
-      checkNotNull(stopRow, "stopRow");
-      ByteKeyRange keyRange =
-          ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
-      return withKeyRange(keyRange);
-    }
+        public Read withScan(Scan scan) {
+            checkArgument(scan != null, "scancan not be null");
+            return new Read(serializableConfiguration, tableId, new SerializableScan(scan));
+        }
+
+        /**
+         Filters the rows read from HBase
+          using the given* row filter.*/
+
+        public Read withFilter(Filter filter) {
+            checkArgument(filter != null, "filtercan not be null");
+            return withScan(serializableScan.get().setFilter(filter));
+        }
+
+        /**
+         Reads only rows in the specified range.*/
+
+        public Read withKeyRange(ByteKeyRange keyRange) {
+            checkArgument(keyRange != null, "keyRangecan not be null");
+            byte[] startRow = keyRange.getStartKey().getBytes();
+            byte[] stopRow = keyRange.getEndKey().getBytes();
+            return withScan(serializableScan.get().setStartRow(startRow).setStopRow(stopRow));
+        }
+
+        /**
+         Reads only rows in the specified range.*/
+
+        public Read withKeyRange(byte[] startRow, byte[] stopRow) {
+            checkArgument(startRow != null, "startRowcan not be null");
+            checkArgument(stopRow != null, "stopRowcan not be null");
+            ByteKeyRange keyRange =
+                    ByteKeyRange.of(ByteKey.copyFrom(startRow), ByteKey.copyFrom(stopRow));
+            return withKeyRange(keyRange);
+        }
 
     private Read(
         SerializableConfiguration serializableConfiguration,
@@ -241,25 +232,22 @@ public class HBaseIO {
       this.serializableScan = serializableScan;
     }
 
-    @Override
-    public PCollection<Result> expand(PBegin input) {
-      HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
-      return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      checkArgument(serializableConfiguration != null, "Configuration not provided");
-      checkArgument(!tableId.isEmpty(), "Table ID not specified");
-      try (Connection connection =
-          ConnectionFactory.createConnection(serializableConfiguration.get())) {
-        Admin admin = connection.getAdmin();
-        checkArgument(
-            admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
-      } catch (IOException e) {
-        LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-      }
-    }
+        @Override
+        public PCollection<Result> expand(PBegin input) {
+            checkArgument(serializableConfiguration != null,
+                    "withConfiguration() is required");
+            checkArgument(!tableId.isEmpty(), "withTableId() is required");
+            try (Connection connection = ConnectionFactory.createConnection(
+                    serializableConfiguration.get())) {
+                Admin admin = connection.getAdmin();
+                checkArgument(admin.tableExists(TableName.valueOf(tableId)),
+                        "Table %s does not exist", tableId);
+            } catch (IOException e) {
+                LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+            }
+            HBaseSource source = new HBaseSource(this, null /* estimatedSizeBytes */);
+                return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
+        }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
@@ -609,58 +597,50 @@ public class HBaseIO {
     return new Write(null /* SerializableConfiguration */, "");
   }
 
-  /**
-   * A {@link PTransform} that writes to HBase. See the class-level Javadoc on {@link HBaseIO} for
-   * more information.
-   *
-   * @see HBaseIO
-   */
-  public static class Write extends PTransform<PCollection<Mutation>, PDone> {
     /**
-     * Returns a new {@link HBaseIO.Write} that will write to the HBase instance indicated by the
-     * given Configuration, and using any other specified customizations.
+     * A {@link PTransform} that writes to HBase. See the class-level Javadoc on
+      {@link HBaseIO} for* more information.
      *
-     * <p>Does not modify this object.
+     * @see HBaseIO
      */
-    public Write withConfiguration(Configuration configuration) {
-      checkNotNull(configuration, "conf");
-      return new Write(new SerializableConfiguration(configuration), tableId);
-    }
+    public static class Write extends PTransform<PCollection<Mutation>, PDone> {
+        /**
+         Writes to the HBase instance
+          indicated by the* given Configuration.
+         */
+        public Write withConfiguration(Configuration configuration) {
+            checkArgument(configuration != null, "configuration can not be null");
+            return new Write(new SerializableConfiguration(configuration), tableId);
+        }
 
-    /**
-     * Returns a new {@link HBaseIO.Write} that will write to the specified table.
-     *
-     * <p>Does not modify this object.
-     */
-    public Write withTableId(String tableId) {
-      checkNotNull(tableId, "tableId");
-      return new Write(serializableConfiguration, tableId);
-    }
+        /**
+         Writes to the specified table.*/
+
+        public Write withTableId(String tableId) {
+            checkArgument(tableId != null, "tableIdcan not be null");
+            return new Write(serializableConfiguration, tableId);
+        }
 
     private Write(SerializableConfiguration serializableConfiguration, String tableId) {
       this.serializableConfiguration = serializableConfiguration;
       this.tableId = tableId;
     }
 
-    @Override
-    public PDone expand(PCollection<Mutation> input) {
-      input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
-      return PDone.in(input.getPipeline());
-    }
-
-    @Override
-    public void validate(PipelineOptions options) {
-      checkArgument(serializableConfiguration != null, "Configuration not specified");
-      checkArgument(!tableId.isEmpty(), "Table ID not specified");
-      try (Connection connection =
-          ConnectionFactory.createConnection(serializableConfiguration.get())) {
-        Admin admin = connection.getAdmin();
-        checkArgument(
-            admin.tableExists(TableName.valueOf(tableId)), "Table %s does not exist", tableId);
-      } catch (IOException e) {
-        LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
-      }
-    }
+        @Override
+        public PDone expand(PCollection<Mutation> input) {
+            checkArgument(serializableConfiguration != null, "withConfiguration() is required");
+            checkArgument(tableId != null && !tableId.isEmpty(), "withTableId() is required");
+            try (Connection connection = ConnectionFactory.createConnection(
+                    serializableConfiguration.get())) {
+                Admin admin = connection.getAdmin();
+                checkArgument(admin.tableExists(TableName.valueOf(tableId)),
+                        "Table %s does not exist", tableId);
+            } catch (IOException e) {
+                LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+            }
+            input.apply(ParDo.of(new HBaseWriterFn(tableId, serializableConfiguration)));
+            return PDone.in(input.getPipeline());
+        }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index e6f7ac4..73ba64b 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -151,21 +151,21 @@ public class HBaseIOTest {
   public void testWriteValidationFailsMissingTable() {
     HBaseIO.Write write = HBaseIO.write().withConfiguration(conf);
     thrown.expect(IllegalArgumentException.class);
-    write.validate(null /* input */);
+    write.expand(null /* input */);
   }
 
   @Test
   public void testWriteValidationFailsMissingConfiguration() {
     HBaseIO.Write write = HBaseIO.write().withTableId("table");
     thrown.expect(IllegalArgumentException.class);
-    write.validate(null /* input */);
+    write.expand(null /* input */);
   }
 
   /** Tests that when reading from a non-existent table, the read fails. */
   @Test
   public void testReadingFailsTableDoesNotExist() throws Exception {
     final String table = "TEST-TABLE-INVALID";
-    // Exception will be thrown by read.validate() when read is applied.
+    // Exception will be thrown by read.expand() when read is applied.
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(String.format("Table %s does not exist", table));
     runReadTest(HBaseIO.read().withConfiguration(conf).withTableId(table), new ArrayList<Result>());
@@ -355,14 +355,14 @@ public class HBaseIOTest {
   public void testWritingFailsTableDoesNotExist() throws Exception {
     final String table = "TEST-TABLE-DOES-NOT-EXIST";
 
-    p.apply(Create.empty(HBaseMutationCoder.of()))
-        .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
 
-    // Exception will be thrown by write.validate() when write is applied.
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage(String.format("Table %s does not exist", table));
-    p.run();
-  }
+
+        // Exception will be thrown by write.expand() when write is applied.
+        thrown.expect(IllegalArgumentException.class);
+        thrown.expectMessage(String.format("Table %s does not exist", table));
+        p.apply(Create.empty(HBaseMutationCoder.of()))
+         .apply("write", HBaseIO.write().withConfiguration(conf).withTableId(table));
+    }
 
   /** Tests that when writing an element fails, the write fails. */
   @Test

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
index d8e462b..8ff9a28 100644
--- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
+++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.hcatalog;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -180,13 +179,10 @@ public class HCatalogIO {
 
     @Override
     public PCollection<HCatRecord> expand(PBegin input) {
-      return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this)));
-    }
+      checkArgument(getTable() != null, "withTable() is required");
+      checkArgument(getConfigProperties() != null, "withConfigProperties() is required");
 
-    @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(getTable(), "table");
-      checkNotNull(getConfigProperties(), "configProperties");
+      return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this)));
     }
 
     @Override
@@ -215,11 +211,6 @@ public class HCatalogIO {
     }
 
     @Override
-    public void validate() {
-      spec.validate(null);
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       spec.populateDisplayData(builder);
     }
@@ -396,16 +387,12 @@ public class HCatalogIO {
 
     @Override
     public PDone expand(PCollection<HCatRecord> input) {
+      checkArgument(getConfigProperties() != null, "withConfigProperties() is required");
+      checkArgument(getTable() != null, "withTable() is required");
       input.apply(ParDo.of(new WriteFn(this)));
       return PDone.in(input.getPipeline());
     }
 
-    @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(getConfigProperties(), "configProperties");
-      checkNotNull(getTable(), "table");
-    }
-
     private static class WriteFn extends DoFn<HCatRecord, Void> {
       private final Write spec;
       private WriterContext writerContext;

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
index 91671a5..dc53c84 100644
--- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
+++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java
@@ -175,19 +175,19 @@ public class HCatalogIOTest implements Serializable {
   /** Test of Write without specifying a table. */
   @Test
   public void testWriteFailureValidationTable() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage(containsString("table"));
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("withTable() is required");
     HCatalogIO.write()
         .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-        .validate(null);
+        .expand(null);
   }
 
   /** Test of Write without specifying configuration properties. */
   @Test
   public void testWriteFailureValidationConfigProp() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage(containsString("configProperties"));
-    HCatalogIO.write().withTable("myowntable").validate(null);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("withConfigProperties() is required");
+    HCatalogIO.write().withTable("myowntable").expand(null);
   }
 
   /** Test of Read from a non-existent table. */
@@ -204,19 +204,19 @@ public class HCatalogIOTest implements Serializable {
   /** Test of Read without specifying configuration properties. */
   @Test
   public void testReadFailureValidationConfig() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage(containsString("configProperties"));
-    HCatalogIO.read().withTable("myowntable").validate(null);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("withConfigProperties() is required");
+    HCatalogIO.read().withTable("myowntable").expand(null);
   }
 
   /** Test of Read without specifying a table. */
   @Test
   public void testReadFailureValidationTable() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage(containsString("table"));
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("withTable() is required");
     HCatalogIO.read()
         .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf()))
-        .validate(null);
+        .expand(null);
   }
 
   /** Test of Read using SourceTestUtils.readFromSource(..). */

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index dc30b2d..14b766e 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.jdbc;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
@@ -31,7 +30,6 @@ import javax.annotation.Nullable;
 import javax.sql.DataSource;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -205,20 +203,16 @@ public class JdbcIO {
     }
 
     public static DataSourceConfiguration create(DataSource dataSource) {
-      checkArgument(dataSource != null, "DataSourceConfiguration.create(dataSource) called with "
-          + "null data source");
-      checkArgument(dataSource instanceof Serializable,
-          "DataSourceConfiguration.create(dataSource) called with a dataSource not Serializable");
+      checkArgument(dataSource != null, "dataSource can not be null");
+      checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable");
       return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
           .setDataSource(dataSource)
           .build();
     }
 
     public static DataSourceConfiguration create(String driverClassName, String url) {
-      checkArgument(driverClassName != null,
-          "DataSourceConfiguration.create(driverClassName, url) called with null driverClassName");
-      checkArgument(url != null,
-          "DataSourceConfiguration.create(driverClassName, url) called with null url");
+      checkArgument(driverClassName != null, "driverClassName can not be null");
+      checkArgument(url != null, "url can not be null");
       return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
           .setDriverClassName(driverClassName)
           .setUrl(url)
@@ -241,9 +235,7 @@ public class JdbcIO {
      * {@link #withPassword(String)}, so they do not need to be included here.
      */
     public DataSourceConfiguration withConnectionProperties(String connectionProperties) {
-      checkArgument(connectionProperties != null, "DataSourceConfiguration.create(driver, url)"
-          + ".withConnectionProperties(connectionProperties) "
-          + "called with null connectionProperties");
+      checkArgument(connectionProperties != null, "connectionProperties can not be null");
       return builder().setConnectionProperties(connectionProperties).build();
     }
 
@@ -305,41 +297,43 @@ public class JdbcIO {
     }
 
     public Read<T> withDataSourceConfiguration(DataSourceConfiguration configuration) {
-      checkArgument(configuration != null, "JdbcIO.read().withDataSourceConfiguration"
-          + "(configuration) called with null configuration");
+      checkArgument(configuration != null, "configuration can not be null");
       return toBuilder().setDataSourceConfiguration(configuration).build();
     }
 
     public Read<T> withQuery(String query) {
-      checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query");
+      checkArgument(query != null, "query can not be null");
       return withQuery(ValueProvider.StaticValueProvider.of(query));
     }
 
     public Read<T> withQuery(ValueProvider<String> query) {
-      checkArgument(query != null, "JdbcIO.read().withQuery(query) called with null query");
+      checkArgument(query != null, "query can not be null");
       return toBuilder().setQuery(query).build();
     }
 
     public Read<T> withStatementPreparator(StatementPreparator statementPreparator) {
-      checkArgument(statementPreparator != null,
-          "JdbcIO.read().withStatementPreparator(statementPreparator) called "
-              + "with null statementPreparator");
+      checkArgument(statementPreparator != null, "statementPreparator can not be null");
       return toBuilder().setStatementPreparator(statementPreparator).build();
     }
 
     public Read<T> withRowMapper(RowMapper<T> rowMapper) {
-      checkArgument(rowMapper != null,
-          "JdbcIO.read().withRowMapper(rowMapper) called with null rowMapper");
+      checkArgument(rowMapper != null, "rowMapper can not be null");
       return toBuilder().setRowMapper(rowMapper).build();
     }
 
     public Read<T> withCoder(Coder<T> coder) {
-      checkArgument(coder != null, "JdbcIO.read().withCoder(coder) called with null coder");
+      checkArgument(coder != null, "coder can not be null");
       return toBuilder().setCoder(coder).build();
     }
 
     @Override
     public PCollection<T> expand(PBegin input) {
+      checkArgument(getQuery() != null, "withQuery() is required");
+      checkArgument(getRowMapper() != null, "withRowMapper() is required");
+      checkArgument(getCoder() != null, "withCoder() is required");
+      checkArgument(
+          getDataSourceConfiguration() != null, "withDataSourceConfiguration() is required");
+
       return input
           .apply(Create.of((Void) null))
           .apply(
@@ -361,19 +355,6 @@ public class JdbcIO {
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      checkState(getQuery() != null,
-          "JdbcIO.read() requires a query to be set via withQuery(query)");
-      checkState(getRowMapper() != null,
-          "JdbcIO.read() requires a rowMapper to be set via withRowMapper(rowMapper)");
-      checkState(getCoder() != null,
-          "JdbcIO.read() requires a coder to be set via withCoder(coder)");
-      checkState(getDataSourceConfiguration() != null,
-          "JdbcIO.read() requires a DataSource configuration to be set via "
-              + "withDataSourceConfiguration(dataSourceConfiguration)");
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder.add(DisplayData.item("query", getQuery()));
@@ -460,19 +441,6 @@ public class JdbcIO {
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      checkState(getQuery() != null,
-          "JdbcIO.read() requires a query to be set via withQuery(query)");
-      checkState(getRowMapper() != null,
-          "JdbcIO.read() requires a rowMapper to be set via withRowMapper(rowMapper)");
-      checkState(getCoder() != null,
-          "JdbcIO.read() requires a coder to be set via withCoder(coder)");
-      checkState(getDataSourceConfiguration() != null,
-          "JdbcIO.read() requires a DataSource configuration to be set via "
-              + "withDataSourceConfiguration(dataSourceConfiguration)");
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder.add(DisplayData.item("query", getQuery()));
@@ -568,22 +536,16 @@ public class JdbcIO {
 
     @Override
     public PDone expand(PCollection<T> input) {
+      checkArgument(
+          getDataSourceConfiguration() != null, "withDataSourceConfiguration() is required");
+      checkArgument(getStatement() != null, "withStatement() is required");
+      checkArgument(
+          getPreparedStatementSetter() != null, "withPreparedStatementSetter() is required");
+
       input.apply(ParDo.of(new WriteFn<T>(this)));
       return PDone.in(input.getPipeline());
     }
 
-    @Override
-    public void validate(PipelineOptions options) {
-      checkArgument(getDataSourceConfiguration() != null,
-          "JdbcIO.write() requires a configuration to be set via "
-              + ".withDataSourceConfiguration(configuration)");
-      checkArgument(getStatement() != null,
-          "JdbcIO.write() requires a statement to be set via .withStatement(statement)");
-      checkArgument(getPreparedStatementSetter() != null,
-          "JdbcIO.write() requires a preparedStatementSetter to be set via "
-              + ".withPreparedStatementSetter(preparedStatementSetter)");
-    }
-
     private static class WriteFn<T> extends DoFn<T, Void> {
       private static final int DEFAULT_BATCH_SIZE = 1000;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 2af0ce9..b3a9c8b 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.jms;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -165,8 +164,7 @@ public class JmsIO {
      * @return The corresponding {@link JmsIO.Read}.
      */
     public Read withConnectionFactory(ConnectionFactory connectionFactory) {
-      checkArgument(connectionFactory != null, "withConnectionFactory(connectionFactory) called"
-          + " with null connectionFactory");
+      checkArgument(connectionFactory != null, "connectionFactory can not be null");
       return builder().setConnectionFactory(connectionFactory).build();
     }
 
@@ -189,7 +187,7 @@ public class JmsIO {
      * @return The corresponding {@link JmsIO.Read}.
      */
     public Read withQueue(String queue) {
-      checkArgument(queue != null, "withQueue(queue) called with null queue");
+      checkArgument(queue != null, "queue can not be null");
       return builder().setQueue(queue).build();
     }
 
@@ -212,7 +210,7 @@ public class JmsIO {
      * @return The corresponding {@link JmsIO.Read}.
      */
     public Read withTopic(String topic) {
-      checkArgument(topic != null, "withTopic(topic) called with null topic");
+      checkArgument(topic != null, "topic can not be null");
       return builder().setTopic(topic).build();
     }
 
@@ -220,8 +218,7 @@ public class JmsIO {
      * Define the username to connect to the JMS broker (authenticated).
      */
     public Read withUsername(String username) {
-      checkArgument(username != null, "JmsIO.read().withUsername(username) called with null "
-          + "username");
+      checkArgument(username != null, "username can not be null");
       return builder().setUsername(username).build();
     }
 
@@ -229,8 +226,7 @@ public class JmsIO {
      * Define the password to connect to the JMS broker (authenticated).
      */
     public Read withPassword(String password) {
-      checkArgument(password != null, "JmsIO.read().withPassword(password) called with null "
-          + "password");
+      checkArgument(password != null, "password can not be null");
       return builder().setPassword(password).build();
     }
 
@@ -251,8 +247,7 @@ public class JmsIO {
      * @return The corresponding {@link JmsIO.Read}.
      */
     public Read withMaxNumRecords(long maxNumRecords) {
-      checkArgument(maxNumRecords >= 0, "withMaxNumRecords(maxNumRecords) called with invalid "
-          + "maxNumRecords");
+      checkArgument(maxNumRecords >= 0, "maxNumRecords must be > 0, but was: %d", maxNumRecords);
       return builder().setMaxNumRecords(maxNumRecords).build();
     }
 
@@ -273,13 +268,20 @@ public class JmsIO {
      * @return The corresponding {@link JmsIO.Read}.
      */
     public Read withMaxReadTime(Duration maxReadTime) {
-      checkArgument(maxReadTime != null, "withMaxReadTime(maxReadTime) called with null "
-          + "maxReadTime");
+      checkArgument(maxReadTime != null, "maxReadTime can not be null");
       return builder().setMaxReadTime(maxReadTime).build();
     }
 
     @Override
     public PCollection<JmsRecord> expand(PBegin input) {
+      checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
+      checkArgument(
+          getQueue() != null || getTopic() != null,
+          "Either withQueue() or withTopic() is required");
+      checkArgument(
+          getQueue() == null || getTopic() == null,
+          "withQueue() and withTopic() are exclusive");
+
       // handles unbounded source to bounded conversion if maxNumRecords is set.
       Unbounded<JmsRecord> unbounded = org.apache.beam.sdk.io.Read.from(createSource());
 
@@ -295,15 +297,6 @@ public class JmsIO {
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      checkState(getConnectionFactory() != null, "JmsIO.read() requires a JMS connection "
-          + "factory to be set via withConnectionFactory(connectionFactory)");
-      checkState((getQueue() != null || getTopic() != null), "JmsIO.read() requires a JMS "
-          + "destination (queue or topic) to be set via withQueue(queueName) or withTopic"
-          + "(topicName)");
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder.addIfNotNull(DisplayData.item("queue", getQueue()));
@@ -363,11 +356,6 @@ public class JmsIO {
     }
 
     @Override
-    public void validate() {
-      spec.validate(null);
-    }
-
-    @Override
     public Coder<JmsCheckpointMark> getCheckpointMarkCoder() {
       return AvroCoder.of(JmsCheckpointMark.class);
     }
@@ -579,8 +567,7 @@ public class JmsIO {
      * @return The corresponding {@link JmsIO.Read}.
      */
     public Write withConnectionFactory(ConnectionFactory connectionFactory) {
-      checkArgument(connectionFactory != null, "withConnectionFactory(connectionFactory) called"
-          + " with null connectionFactory");
+      checkArgument(connectionFactory != null, "connectionFactory can not be null");
       return builder().setConnectionFactory(connectionFactory).build();
     }
 
@@ -603,7 +590,7 @@ public class JmsIO {
      * @return The corresponding {@link JmsIO.Read}.
      */
     public Write withQueue(String queue) {
-      checkArgument(queue != null, "withQueue(queue) called with null queue");
+      checkArgument(queue != null, "queue can not be null");
       return builder().setQueue(queue).build();
     }
 
@@ -626,7 +613,7 @@ public class JmsIO {
      * @return The corresponding {@link JmsIO.Read}.
      */
     public Write withTopic(String topic) {
-      checkArgument(topic != null, "withTopic(topic) called with null topic");
+      checkArgument(topic != null, "topic can not be null");
       return builder().setTopic(topic).build();
     }
 
@@ -634,8 +621,7 @@ public class JmsIO {
      * Define the username to connect to the JMS broker (authenticated).
      */
     public Write withUsername(String username) {
-      checkArgument(username != null,  "JmsIO.write().withUsername(username) called with null "
-          + "username");
+      checkArgument(username != null,  "username can not be null");
       return builder().setUsername(username).build();
     }
 
@@ -643,25 +629,24 @@ public class JmsIO {
      * Define the password to connect to the JMS broker (authenticated).
      */
     public Write withPassword(String password) {
-      checkArgument(password != null, "JmsIO.write().withPassword(password) called with null "
-          + "password");
+      checkArgument(password != null, "password can not be null");
       return builder().setPassword(password).build();
     }
 
     @Override
     public PDone expand(PCollection<String> input) {
+      checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
+      checkArgument(
+          getQueue() != null || getTopic() != null,
+          "Either withQueue(queue) or withTopic(topic) is required");
+      checkArgument(
+          getQueue() == null || getTopic() == null,
+          "withQueue(queue) and withTopic(topic) are exclusive");
+
       input.apply(ParDo.of(new WriterFn(this)));
       return PDone.in(input.getPipeline());
     }
 
-    @Override
-    public void validate(PipelineOptions options) {
-      checkState(getConnectionFactory() != null, "JmsIO.write() requires a JMS connection "
-          + "factory to be set via withConnectionFactory(connectionFactory)");
-      checkState((getQueue() != null || getTopic() != null), "JmsIO.write() requires a JMS "
-          + "destination (queue or topic) to be set via withQueue(queue) or withTopic(topic)");
-    }
-
     private static class WriterFn extends DoFn<String, Void> {
 
       private Write spec;

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index dae4c1d..aab99c3 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -487,7 +487,7 @@ public class KafkaIO {
      */
     public Read<K, V> withTimestampFn2(
         SerializableFunction<KafkaRecord<K, V>, Instant> timestampFn) {
-      checkNotNull(timestampFn);
+      checkArgument(timestampFn != null, "timestampFn can not be null");
       return toBuilder().setTimestampFn(timestampFn).build();
     }
 
@@ -497,7 +497,7 @@ public class KafkaIO {
      */
     public Read<K, V> withWatermarkFn2(
         SerializableFunction<KafkaRecord<K, V>, Instant> watermarkFn) {
-      checkNotNull(watermarkFn);
+      checkArgument(watermarkFn != null, "watermarkFn can not be null");
       return toBuilder().setWatermarkFn(watermarkFn).build();
     }
 
@@ -505,7 +505,7 @@ public class KafkaIO {
      * A function to assign a timestamp to a record. Default is processing timestamp.
      */
     public Read<K, V> withTimestampFn(SerializableFunction<KV<K, V>, Instant> timestampFn) {
-      checkNotNull(timestampFn);
+      checkArgument(timestampFn != null, "timestampFn can not be null");
       return withTimestampFn2(unwrapKafkaAndThen(timestampFn));
     }
 
@@ -514,7 +514,7 @@ public class KafkaIO {
      * @see #withTimestampFn(SerializableFunction)
      */
     public Read<K, V> withWatermarkFn(SerializableFunction<KV<K, V>, Instant> watermarkFn) {
-      checkNotNull(watermarkFn);
+      checkArgument(watermarkFn != null, "watermarkFn can not be null");
       return withWatermarkFn2(unwrapKafkaAndThen(watermarkFn));
     }
 
@@ -526,13 +526,14 @@ public class KafkaIO {
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
-          "Kafka bootstrap servers should be set");
+    public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
+      checkArgument(
+          getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
+          "withBootstrapServers() is required");
       checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0,
-          "Kafka topics or topic_partitions are required");
-      checkNotNull(getKeyDeserializer(), "Key deserializer must be set");
-      checkNotNull(getValueDeserializer(), "Value deserializer must be set");
+          "Either withTopic(), withTopics() or withTopicPartitions() is required");
+      checkArgument(getKeyDeserializer() != null, "withKeyDeserializer() is required");
+      checkArgument(getValueDeserializer() != null, "withValueDeserializer() is required");
       if (getStartReadTime() != null) {
         checkArgument(new ConsumerSpEL().hasOffsetsForTimes(),
             "Consumer.offsetsForTimes is only supported by Kafka Client 0.10.1.0 onwards, "
@@ -540,25 +541,23 @@ public class KafkaIO {
                 + ". If you are building with maven, set \"kafka.clients.version\" "
                 + "maven property to 0.10.1.0 or newer.");
       }
-    }
 
-    @Override
-    public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
       // Infer key/value coders if not specified explicitly
       CoderRegistry registry = input.getPipeline().getCoderRegistry();
 
       Coder<K> keyCoder =
-          checkNotNull(
-              getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()),
-              "Key coder could not be inferred from key deserializer. Please provide"
-                  + "key coder explicitly using withKeyDeserializerAndCoder()");
+          getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer());
+      checkArgument(
+          keyCoder != null,
+          "Key coder could not be inferred from key deserializer. Please provide"
+              + "key coder explicitly using withKeyDeserializerAndCoder()");
 
       Coder<V> valueCoder =
-          checkNotNull(
-              getValueCoder() != null ? getValueCoder()
-                  : inferCoder(registry, getValueDeserializer()),
-              "Value coder could not be inferred from value deserializer. Please provide"
-                  + "value coder explicitly using withValueDeserializerAndCoder()");
+          getValueCoder() != null ? getValueCoder() : inferCoder(registry, getValueDeserializer());
+      checkArgument(
+          valueCoder != null,
+          "Value coder could not be inferred from value deserializer. Please provide"
+              + "value coder explicitly using withValueDeserializerAndCoder()");
 
       // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
       Unbounded<KafkaRecord<K, V>> unbounded =
@@ -840,11 +839,6 @@ public class KafkaIO {
     }
 
     @Override
-    public void validate() {
-      spec.validate(null);
-    }
-
-    @Override
     public Coder<KafkaRecord<K, V>> getOutputCoder() {
       return KafkaRecordCoder.of(spec.getKeyCoder(), spec.getValueCoder());
     }
@@ -1488,17 +1482,15 @@ public class KafkaIO {
 
     @Override
     public PDone expand(PCollection<KV<K, V>> input) {
+      checkArgument(
+          getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) != null,
+          "withBootstrapServers() is required");
+      checkArgument(getTopic() != null, "withTopic() is required");
+
       input.apply(ParDo.of(new KafkaWriter<>(this)));
       return PDone.in(input.getPipeline());
     }
 
-    @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
-          "Kafka bootstrap servers should be set");
-      checkNotNull(getTopic(), "Kafka topic should be set");
-    }
-
     // set config defaults
     private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
         ImmutableMap.<String, Object>of(

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index bc8ada1..ef39a91 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.kinesis;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
@@ -151,8 +150,7 @@ public final class KinesisIO {
     public Read from(String streamName, InitialPositionInStream initialPosition) {
       return toBuilder()
           .setStreamName(streamName)
-          .setInitialPosition(
-              new StartingPoint(checkNotNull(initialPosition, "initialPosition")))
+          .setInitialPosition(new StartingPoint(initialPosition))
           .build();
     }
 
@@ -163,8 +161,7 @@ public final class KinesisIO {
     public Read from(String streamName, Instant initialTimestamp) {
       return toBuilder()
           .setStreamName(streamName)
-          .setInitialPosition(
-              new StartingPoint(checkNotNull(initialTimestamp, "initialTimestamp")))
+          .setInitialPosition(new StartingPoint(initialTimestamp))
           .build();
     }
 
@@ -197,7 +194,7 @@ public final class KinesisIO {
 
     /** Specifies to read at most a given number of records. */
     public Read withMaxReadTime(Duration maxReadTime) {
-      checkNotNull(maxReadTime, "maxReadTime");
+      checkArgument(maxReadTime != null, "maxReadTime can not be null");
       return toBuilder().setMaxReadTime(maxReadTime).build();
     }
 
@@ -226,9 +223,12 @@ public final class KinesisIO {
       private final Regions region;
 
       private BasicKinesisProvider(String accessKey, String secretKey, Regions region) {
-        this.accessKey = checkNotNull(accessKey, "accessKey");
-        this.secretKey = checkNotNull(secretKey, "secretKey");
-        this.region = checkNotNull(region, "region");
+        checkArgument(accessKey != null, "accessKey can not be null");
+        checkArgument(secretKey != null, "secretKey can not be null");
+        checkArgument(region != null, "region can not be null");
+        this.accessKey = accessKey;
+        this.secretKey = secretKey;
+        this.region = region;
       }
 
       private AWSCredentialsProvider getCredentialsProvider() {

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index e5c32d2..e2fa474 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -23,7 +23,6 @@ import static com.google.common.collect.Lists.newArrayList;
 import java.io.IOException;
 import java.util.List;
 import java.util.NoSuchElementException;
-
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.joda.time.Instant;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index c612d52..4dc2405 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -431,10 +431,6 @@ public class MongoDbGridFSIO {
       }
 
       @Override
-      public void validate() {
-      }
-
-      @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         spec.populateDisplayData(builder);
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
index d29f0ae..9007051 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.mongodb;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -205,8 +204,7 @@ public class MongoDbIO {
      * Sets the database to use.
      */
     public Read withDatabase(String database) {
-      checkArgument(database != null, "MongoDbIO.read().withDatabase(database) called with null"
-          + " database");
+      checkArgument(database != null, "database can not be null");
       return builder().setDatabase(database).build();
     }
 
@@ -214,8 +212,7 @@ public class MongoDbIO {
      * Sets the collection to consider in the database.
      */
     public Read withCollection(String collection) {
-      checkArgument(collection != null, "MongoDbIO.read().withCollection(collection) called "
-          + "with null collection");
+      checkArgument(collection != null, "collection can not be null");
       return builder().setCollection(collection).build();
     }
 
@@ -223,8 +220,7 @@ public class MongoDbIO {
      * Sets a filter on the documents in a collection.
      */
     public Read withFilter(String filter) {
-      checkArgument(filter != null, "MongoDbIO.read().withFilter(filter) called with null "
-          + "filter");
+      checkArgument(filter != null, "filter can not be null");
       return builder().setFilter(filter).build();
     }
 
@@ -232,27 +228,19 @@ public class MongoDbIO {
      * Sets the user defined number of splits.
      */
     public Read withNumSplits(int numSplits) {
-      checkArgument(numSplits >= 0, "MongoDbIO.read().withNumSplits(numSplits) called with "
-          + "invalid number. The number of splits has to be a positive value (currently %d)",
-          numSplits);
+      checkArgument(numSplits >= 0, "invalid num_splits: must be >= 0, but was %d", numSplits);
       return builder().setNumSplits(numSplits).build();
     }
 
     @Override
     public PCollection<Document> expand(PBegin input) {
+      checkArgument(uri() != null, "withUri() is required");
+      checkArgument(database() != null, "withDatabase() is required");
+      checkArgument(collection() != null, "withCollection() is required");
       return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedMongoDbSource(this)));
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      checkState(uri() != null, "MongoDbIO.read() requires an URI to be set via withUri(uri)");
-      checkState(database() != null, "MongoDbIO.read() requires a database to be set via "
-          + "withDatabase(database)");
-      checkState(collection() != null, "MongoDbIO.read() requires a collection to be set via "
-          + "withCollection(collection)");
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder.add(DisplayData.item("uri", uri()));
@@ -282,11 +270,6 @@ public class MongoDbIO {
     }
 
     @Override
-    public void validate() {
-      spec.validate(null);
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       spec.populateDisplayData(builder);
     }
@@ -576,7 +559,7 @@ public class MongoDbIO {
      *   </ul>
      */
     public Write withUri(String uri) {
-      checkArgument(uri != null, "MongoDbIO.write().withUri(uri) called with null uri");
+      checkArgument(uri != null, "uri can not be null");
       return builder().setUri(uri).build();
     }
 
@@ -598,8 +581,7 @@ public class MongoDbIO {
      * Sets the database to use.
      */
     public Write withDatabase(String database) {
-      checkArgument(database != null, "MongoDbIO.write().withDatabase(database) called with "
-          + "null database");
+      checkArgument(database != null, "database can not be null");
       return builder().setDatabase(database).build();
     }
 
@@ -607,8 +589,7 @@ public class MongoDbIO {
      * Sets the collection where to write data in the database.
      */
     public Write withCollection(String collection) {
-      checkArgument(collection != null, "MongoDbIO.write().withCollection(collection) called "
-          + "with null collection");
+      checkArgument(collection != null, "collection can not be null");
       return builder().setCollection(collection).build();
     }
 
@@ -616,27 +597,21 @@ public class MongoDbIO {
      * Define the size of the batch to group write operations.
      */
     public Write withBatchSize(long batchSize) {
-      checkArgument(batchSize >= 0, "MongoDbIO.write().withBatchSize(batchSize) called with "
-          + "invalid batch size. Batch size has to be >= 0 (currently %d)", batchSize);
+      checkArgument(batchSize >= 0, "Batch size must be >= 0, but was %d", batchSize);
       return builder().setBatchSize(batchSize).build();
     }
 
     @Override
     public PDone expand(PCollection<Document> input) {
+      checkArgument(uri() != null, "withUri() is required");
+      checkArgument(database() != null, "withDatabase() is required");
+      checkArgument(collection() != null, "withCollection() is required");
+
       input.apply(ParDo.of(new WriteFn(this)));
       return PDone.in(input.getPipeline());
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      checkState(uri() != null, "MongoDbIO.write() requires an URI to be set via withUri(uri)");
-      checkState(database() != null, "MongoDbIO.write() requires a database to be set via "
-          + "withDatabase(database)");
-      checkState(collection() != null, "MongoDbIO.write() requires a collection to be set via "
-          + "withCollection(collection)");
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       builder.add(DisplayData.item("uri", uri()));
       builder.add(DisplayData.item("keepAlive", keepAlive()));

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
index 5aadb80..f9083bb 100644
--- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
+++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java
@@ -149,12 +149,8 @@ public class MqttIO {
      * @return A connection configuration to the MQTT broker.
      */
     public static ConnectionConfiguration create(String serverUri, String topic) {
-      checkArgument(serverUri != null,
-          "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
-              + "serverUri");
-      checkArgument(topic != null,
-          "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
-              + "topic");
+      checkArgument(serverUri != null, "serverUri can not be null");
+      checkArgument(topic != null, "topic can not be null");
       return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri)
           .setTopic(topic).build();
     }
@@ -168,14 +164,9 @@ public class MqttIO {
      * @return A connection configuration to the MQTT broker.
      */
     public static ConnectionConfiguration create(String serverUri, String topic, String clientId) {
-      checkArgument(serverUri != null,
-          "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
-              + "serverUri");
-      checkArgument(topic != null,
-          "MqttIO.ConnectionConfiguration.create(serverUri, topic) called with null "
-              + "topic");
-      checkArgument(clientId != null, "MqttIO.ConnectionConfiguration.create(serverUri,"
-          + "topic, clientId) called with null clientId");
+      checkArgument(serverUri != null, "serverUri can not be null");
+      checkArgument(topic != null, "topic can not be null");
+      checkArgument(clientId != null, "clientId can not be null");
       return new AutoValue_MqttIO_ConnectionConfiguration.Builder().setServerUri(serverUri)
           .setTopic(topic).setClientId(clientId).build();
     }
@@ -242,9 +233,7 @@ public class MqttIO {
      * Define the MQTT connection configuration used to connect to the MQTT broker.
      */
     public Read withConnectionConfiguration(ConnectionConfiguration configuration) {
-      checkArgument(configuration != null,
-          "MqttIO.read().withConnectionConfiguration(configuration) called with null "
-              + "configuration or not called at all");
+      checkArgument(configuration != null, "configuration can not be null");
       return builder().setConnectionConfiguration(configuration).build();
     }
 
@@ -254,8 +243,6 @@ public class MqttIO {
      * will provide a bounded {@link PCollection}.
      */
     public Read withMaxNumRecords(long maxNumRecords) {
-      checkArgument(maxReadTime() == null,
-          "maxNumRecord and maxReadTime are exclusive");
       return builder().setMaxNumRecords(maxNumRecords).build();
     }
 
@@ -265,13 +252,14 @@ public class MqttIO {
      * {@link PCollection}.
      */
     public Read withMaxReadTime(Duration maxReadTime) {
-      checkArgument(maxNumRecords() == Long.MAX_VALUE,
-          "maxNumRecord and maxReadTime are exclusive");
       return builder().setMaxReadTime(maxReadTime).build();
     }
 
     @Override
     public PCollection<byte[]> expand(PBegin input) {
+      checkArgument(
+          maxReadTime() == null || maxNumRecords() == Long.MAX_VALUE,
+          "withMaxNumRecords() and withMaxReadTime() are exclusive");
 
       org.apache.beam.sdk.io.Read.Unbounded<byte[]> unbounded =
           org.apache.beam.sdk.io.Read.from(new UnboundedMqttSource(this));
@@ -288,11 +276,6 @@ public class MqttIO {
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      // validation is performed in the ConnectionConfiguration create()
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       connectionConfiguration().populateDisplayData(builder);
@@ -372,11 +355,6 @@ public class MqttIO {
     }
 
     @Override
-    public void validate() {
-      spec.validate(null);
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       spec.populateDisplayData(builder);
     }
@@ -511,9 +489,7 @@ public class MqttIO {
      * Define MQTT connection configuration used to connect to the MQTT broker.
      */
     public Write withConnectionConfiguration(ConnectionConfiguration configuration) {
-      checkArgument(configuration != null,
-          "MqttIO.write().withConnectionConfiguration(configuration) called with null "
-              + "configuration or not called at all");
+      checkArgument(configuration != null, "configuration can not be null");
       return builder().setConnectionConfiguration(configuration).build();
     }
 
@@ -538,11 +514,6 @@ public class MqttIO {
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      // validate is done in connection configuration
-    }
-
-    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       connectionConfiguration().populateDisplayData(builder);
       builder.add(DisplayData.item("retained", retained()));

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
index c137eea..f811139 100644
--- a/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
+++ b/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
@@ -284,15 +284,11 @@ public class SolrIO {
 
     @Override
     public PCollection<SolrDocument> expand(PBegin input) {
-      return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedSolrSource(this, null)));
-    }
+      checkArgument(
+          getConnectionConfiguration() != null, "withConnectionConfiguration() is required");
+      checkArgument(getCollection() != null, "from() is required");
 
-    @Override
-    public void validate(PipelineOptions options) {
-      checkState(
-          getConnectionConfiguration() != null,
-          "Need to set connection configuration using withConnectionConfiguration()");
-      checkState(getCollection() != null, "Need to set collection name using to()");
+      return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedSolrSource(this, null)));
     }
 
     @Override
@@ -464,11 +460,6 @@ public class SolrIO {
     }
 
     @Override
-    public void validate() {
-      spec.validate(null);
-    }
-
-    @Override
     public Coder<SolrDocument> getOutputCoder() {
       return JavaBinCodecCoder.of(SolrDocument.class);
     }
@@ -642,15 +633,12 @@ public class SolrIO {
     }
 
     @Override
-    public void validate(PipelineOptions options) {
+    public PDone expand(PCollection<SolrInputDocument> input) {
       checkState(
           getConnectionConfiguration() != null,
-          "Need to set connection configuration via withConnectionConfiguration()");
-      checkState(getCollection() != null, "Need to set collection name via to()");
-    }
+          "withConnectionConfiguration() is required");
+      checkState(getCollection() != null, "to() is required");
 
-    @Override
-    public PDone expand(PCollection<SolrInputDocument> input) {
       input.apply(ParDo.of(new WriteFn(this)));
       return PDone.in(input.getPipeline());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
index 749da51..47626cd 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlIO.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.sdk.io.xml;
 
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.io.FileIO.ReadableFile;
 import org.apache.beam.sdk.io.OffsetBasedSource;
 import org.apache.beam.sdk.io.ReadAllViaFileBasedSource;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -288,18 +287,10 @@ public class XmlIO {
     }
 
     private void validate() {
-      checkNotNull(
-          getRootElement(),
-          "rootElement is null. Use builder method withRootElement() to set this.");
-      checkNotNull(
-          getRecordElement(),
-          "recordElement is null. Use builder method withRecordElement() to set this.");
-      checkNotNull(
-          getRecordClass(),
-          "recordClass is null. Use builder method withRecordClass() to set this.");
-      checkNotNull(
-          getCharset(),
-          "charset is null. Use builder method withCharset() to set this.");
+      checkArgument(getRootElement() != null, "withRootElement() is required");
+      checkArgument(getRecordElement() != null, "withRecordElement() is required");
+      checkArgument(getRecordClass() != null, "withRecordClass() is required");
+      checkArgument(getCharset() != null, "withCharset() is required");
     }
 
     @Override
@@ -595,20 +586,17 @@ public class XmlIO {
     }
 
     @Override
-    public void validate(PipelineOptions options) {
-      checkNotNull(getRecordClass(), "Missing a class to bind to a JAXB context.");
-      checkNotNull(getRootElement(), "Missing a root element name.");
-      checkNotNull(getFilenamePrefix(), "Missing a filename to write to.");
-      checkNotNull(getCharset(), "Missing charset");
+    public PDone expand(PCollection<T> input) {
+      checkArgument(getRecordClass() != null, "withRecordClass() is required");
+      checkArgument(getRootElement() != null, "withRootElement() is required");
+      checkArgument(getFilenamePrefix() != null, "to() is required");
+      checkArgument(getCharset() != null, "withCharset() is required");
       try {
         JAXBContext.newInstance(getRecordClass());
       } catch (JAXBException e) {
         throw new RuntimeException("Error binding classes to a JAXB Context.", e);
       }
-    }
 
-    @Override
-    public PDone expand(PCollection<T> input) {
       input.apply(org.apache.beam.sdk.io.WriteFiles.to(createSink()));
       return PDone.in(input.getPipeline());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index b663544..b54d95b 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.io.DynamicFileDestinations;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.ShardNameTemplate;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -51,15 +50,6 @@ class XmlSink<T> extends FileBasedSink<T, Void, T> {
   }
 
   /**
-   * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have
-   * been set and that the class can be bound in a JAXB context.
-   */
-  @Override
-  public void validate(PipelineOptions options) {
-    spec.validate(null);
-  }
-
-  /**
    * Creates an {@link XmlWriteOperation}.
    */
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/1a38de17/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
index d1584dc..3834abd 100644
--- a/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
+++ b/sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSinkTest.java
@@ -127,25 +127,25 @@ public class XmlSinkTest {
   /** Validation ensures no fields are missing. */
   @Test
   public void testValidateXmlSinkMissingRecordClass() {
-    thrown.expect(NullPointerException.class);
+    thrown.expect(IllegalArgumentException.class);
     XmlIO.<Bird>write()
         .to(testFilePrefix)
         .withRootElement(testRootElement)
-        .validate(null);
+        .expand(null);
   }
 
   @Test
   public void testValidateXmlSinkMissingRootElement() {
-    thrown.expect(NullPointerException.class);
+    thrown.expect(IllegalArgumentException.class);
     XmlIO.<Bird>write().withRecordClass(Bird.class)
         .to(testFilePrefix)
-        .validate(null);
+        .expand(null);
   }
 
   @Test
   public void testValidateXmlSinkMissingOutputDirectory() {
-    thrown.expect(NullPointerException.class);
-    XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).validate(null);
+    thrown.expect(IllegalArgumentException.class);
+    XmlIO.<Bird>write().withRecordClass(Bird.class).withRootElement(testRootElement).expand(null);
   }
 
   /**


Mime
View raw message