beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: [BEAM-2260] Improve construction-time errors for Text and AvroIO
Date Thu, 11 May 2017 19:42:34 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 16d281c05 -> c839c1321


[BEAM-2260] Improve construction-time errors for Text and AvroIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3eb24345
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3eb24345
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3eb24345

Branch: refs/heads/release-2.0.0
Commit: 3eb243457520b8e25457416e4645c5a753ce9a0f
Parents: 16d281c
Author: Dan Halperin <dhalperi@google.com>
Authored: Thu May 11 10:45:20 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Thu May 11 12:39:22 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/AvroIO.java    |  2 ++
 .../main/java/org/apache/beam/sdk/io/TextIO.java    |  2 ++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java     | 16 ++++++++++++++++
 .../java/org/apache/beam/sdk/io/TextIOTest.java     | 12 +++++++++++-
 4 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3eb24345/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index c7e7233..d13c6ff 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -441,6 +441,8 @@ public class AvroIO {
           "Cannot set a filename policy and also a filename template or suffix.");
       checkState(getSchema() != null,
           "Need to set the schema of an AvroIO.Write transform.");
+      checkState(!getWindowedWrites() || (getFilenamePolicy() != null),
+          "When using windowed writes, a filename policy must be set via withFilenamePolicy().");
 
       FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
       if (usedFilenamePolicy == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/3eb24345/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index dbfaeee..af6a069 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -430,6 +430,8 @@ public class TextIO {
           (getFilenamePolicy() == null)
               || (getShardTemplate() == null && getFilenameSuffix() == null),
           "Cannot set a filename policy and also a filename template or suffix.");
+      checkState(!getWindowedWrites() || (getFilenamePolicy() != null),
+          "When using windowed writes, a filename policy must be set via withFilenamePolicy().");
 
       FilenamePolicy usedFilenamePolicy = getFilenamePolicy();
       if (usedFilenamePolicy == null) {

http://git-wip-us.apache.org/repos/asf/beam/blob/3eb24345/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 47b847f..d71f2f7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -54,6 +54,7 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
@@ -77,6 +78,7 @@ import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -93,6 +95,9 @@ public class AvroIOTest {
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Test
   public void testAvroIOGetName() {
     assertEquals("AvroIO.Read", AvroIO.read(String.class).from("/tmp/foo*/baz").getName());
@@ -560,4 +565,15 @@ public class AvroIOTest {
     assertThat(displayData, hasDisplayItem("numShards", 100));
     assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
   }
+
+  @Test
+  public void testWindowedWriteRequiresFilenamePolicy() {
+    PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of()));
+    AvroIO.Write write = AvroIO.write(String.class).to("/tmp/some/file").withWindowedWrites();
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage(
+        "When using windowed writes, a filename policy must be set via withFilenamePolicy()");
+    emptyInput.apply(write);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3eb24345/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 685da82..0d8fbbd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -107,7 +107,6 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 @SuppressWarnings("unchecked")
 public class TextIOTest {
-
   private static final String MY_HEADER = "myHeader";
   private static final String MY_FOOTER = "myFooter";
   private static final String[] EMPTY = new String[] {};
@@ -1103,5 +1102,16 @@ public class TextIOTest {
     assertThat(splits, hasSize(equalTo(1)));
     SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
   }
+
+  @Test
+  public void testWindowedWriteRequiresFilenamePolicy() {
+    PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of()));
+    TextIO.Write write = TextIO.write().to("/tmp/some/file").withWindowedWrites();
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage(
+        "When using windowed writes, a filename policy must be set via withFilenamePolicy()");
+    emptyInput.apply(write);
+  }
 }
 


Mime
View raw message