beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/6] beam git commit: Adds AvroIO watchForNewFiles
Date Wed, 30 Aug 2017 19:11:36 GMT
Repository: beam
Updated Branches:
  refs/heads/master d64f2cce8 -> 5c2cab017


Adds AvroIO watchForNewFiles


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1f39871
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1f39871
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1f39871

Branch: refs/heads/master
Commit: f1f39871da3668bb2ffbc1c27449d36c995b645b
Parents: 82b0852
Author: Eugene Kirpichov <ekirpichov@gmail.com>
Authored: Wed Aug 16 14:41:32 2017 -0700
Committer: Eugene Kirpichov <ekirpichov@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 132 ++++++++++++++++++-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  94 +++++++++++--
 2 files changed, 212 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f1f39871/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 9601a7d..f6f3308 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -48,12 +49,14 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Duration;
 
 /**
  * {@link PTransform}s for reading and writing Avro files.
@@ -76,6 +79,9 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  * allows them in case the filepattern contains a glob wildcard character. Use {@link
  * Read#withEmptyMatchTreatment} to configure this behavior.
  *
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles}
+ * allows streaming of new files matching the filepattern(s).
+ *
  * <h3>Reading records of a known schema</h3>
  *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}.
To read
@@ -137,6 +143,20 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *     filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
  * }</pre>
  *
+ * <h3>Streaming new files matching a filepattern</h3>
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
+ *     .read(AvroAutoGenClass.class)
+ *     .from("gs://my_bucket/path/to/records-*.avro")
+ *     .watchForNewFiles(
+ *       // Check for new files every minute
+ *       Duration.standardMinutes(1),
+ *       // Stop watching the filepattern if no new files appear within an hour
+ *       afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
  * <h3>Reading a very large number of files</h3>
  *
  * <p>If it is known that the filepattern will match a very large number of files (e.g.
tens of
@@ -406,6 +426,8 @@ public class AvroIO {
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>
{
     @Nullable abstract ValueProvider<String> getFilepattern();
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    @Nullable abstract Duration getWatchForNewFilesInterval();
+    @Nullable abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getHintMatchesManyFiles();
@@ -416,6 +438,9 @@ public class AvroIO {
     abstract static class Builder<T> {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
       abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+      abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+      abstract Builder<T> setWatchForNewFilesTerminationCondition(
+              TerminationCondition<?, ?> condition);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
@@ -446,6 +471,24 @@ public class AvroIO {
     }
 
     /**
+     * Continuously watches for new files matching the filepattern, polling it at the given
+     * interval, until the given termination condition is reached. The returned {@link PCollection}
+     * is unbounded.
+     *
+     * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}.
+     *
+     * @see TerminationCondition
+     */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public Read<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<?, ?> terminationCondition) {
+      return toBuilder()
+          .setWatchForNewFilesInterval(pollInterval)
+          .setWatchForNewFilesTerminationCondition(terminationCondition)
+          .build();
+    }
+
+    /**
      * Hints that the filepattern specified in {@link #from(String)} matches a very large
number of
      * files.
      *
@@ -463,7 +506,7 @@ public class AvroIO {
       checkNotNull(getFilepattern(), "filepattern");
       checkNotNull(getSchema(), "schema");
 
-      if (!getHintMatchesManyFiles()) {
+      if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) {
         return input.apply(
             "Read",
             org.apache.beam.sdk.io.Read.from(
@@ -477,6 +520,11 @@ public class AvroIO {
               ? (ReadAll<T>) readAllGenericRecords(getSchema())
               : readAll(getRecordClass());
       readAll = readAll.withEmptyMatchTreatment(getEmptyMatchTreatment());
+      if (getWatchForNewFilesInterval() != null) {
+        TerminationCondition<String, ?> readAllCondition =
+            ignoreInput(getWatchForNewFilesTerminationCondition());
+        readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), readAllCondition);
+      }
       return input
           .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
           .apply("Via ReadAll", readAll);
@@ -490,7 +538,10 @@ public class AvroIO {
               DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
           .add(
               DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"));
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+                  .withLabel("Interval to watch for new files"));
     }
 
     @SuppressWarnings("unchecked")
@@ -513,6 +564,8 @@ public class AvroIO {
   @AutoValue
   public abstract static class ReadAll<T> extends PTransform<PCollection<String>,
PCollection<T>> {
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    @Nullable abstract Duration getWatchForNewFilesInterval();
+    @Nullable abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract long getDesiredBundleSizeBytes();
@@ -522,6 +575,9 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+      abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+      abstract Builder<T> setWatchForNewFilesTerminationCondition(
+              TerminationCondition<String, ?> condition);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -534,6 +590,16 @@ public class AvroIO {
       return toBuilder().setEmptyMatchTreatment(treatment).build();
     }
 
+    /** Like {@link Read#watchForNewFiles}. */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public ReadAll<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition)
{
+      return toBuilder()
+              .setWatchForNewFilesInterval(pollInterval)
+              .setWatchForNewFilesTerminationCondition(terminationCondition)
+              .build();
+    }
+
     @VisibleForTesting
     ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
       return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -544,6 +610,11 @@ public class AvroIO {
       checkNotNull(getSchema(), "schema");
       Match.Filepatterns matchFilepatterns =
           Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
+      if (getWatchForNewFilesInterval() != null) {
+        matchFilepatterns =
+            matchFilepatterns.continuously(
+                getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+      }
 
       return input
           .apply(matchFilepatterns)
@@ -563,7 +634,10 @@ public class AvroIO {
       builder
           .add(
               DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"));
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+                  .withLabel("Interval to watch for new files"));
     }
   }
 
@@ -594,6 +668,8 @@ public class AvroIO {
   public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>>
{
     @Nullable abstract ValueProvider<String> getFilepattern();
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    @Nullable abstract Duration getWatchForNewFilesInterval();
+    @Nullable abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition();
     abstract SerializableFunction<GenericRecord, T> getParseFn();
     @Nullable abstract Coder<T> getCoder();
     abstract boolean getHintMatchesManyFiles();
@@ -604,6 +680,9 @@ public class AvroIO {
     abstract static class Builder<T> {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
       abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+      abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+      abstract Builder<T> setWatchForNewFilesTerminationCondition(
+              TerminationCondition<?, ?> condition);
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
@@ -626,6 +705,16 @@ public class AvroIO {
       return toBuilder().setEmptyMatchTreatment(treatment).build();
     }
 
+    /** Like {@link Read#watchForNewFiles}. */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public Parse<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<?, ?> terminationCondition) {
+      return toBuilder()
+          .setWatchForNewFilesInterval(pollInterval)
+          .setWatchForNewFilesTerminationCondition(terminationCondition)
+          .build();
+    }
+
     /** Sets a coder for the result of the parse function. */
     public Parse<T> withCoder(Coder<T> coder) {
       return toBuilder().setCoder(coder).build();
@@ -641,7 +730,7 @@ public class AvroIO {
       checkNotNull(getFilepattern(), "filepattern");
       Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
 
-      if (!getHintMatchesManyFiles()) {
+      if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) {
         return input.apply(
                 org.apache.beam.sdk.io.Read.from(
                         AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
@@ -651,6 +740,11 @@ public class AvroIO {
           parseAllGenericRecords(getParseFn())
               .withCoder(coder)
               .withEmptyMatchTreatment(getEmptyMatchTreatment());
+      if (getWatchForNewFilesInterval() != null) {
+        TerminationCondition<String, ?> parseAllCondition =
+            ignoreInput(getWatchForNewFilesTerminationCondition());
+        parseAll = parseAll.watchForNewFiles(getWatchForNewFilesInterval(), parseAllCondition);
+      }
       return input
           .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
           .apply("Via ParseAll", parseAll);
@@ -684,7 +778,10 @@ public class AvroIO {
           .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
           .add(
               DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"));
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+                  .withLabel("Interval to watch for new files"));
     }
   }
 
@@ -694,6 +791,8 @@ public class AvroIO {
   @AutoValue
   public abstract static class ParseAll<T> extends PTransform<PCollection<String>,
PCollection<T>> {
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    @Nullable abstract Duration getWatchForNewFilesInterval();
+    @Nullable abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition();
     abstract SerializableFunction<GenericRecord, T> getParseFn();
     @Nullable abstract Coder<T> getCoder();
     abstract long getDesiredBundleSizeBytes();
@@ -703,6 +802,9 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+      abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+      abstract Builder<T> setWatchForNewFilesTerminationCondition(
+          TerminationCondition<String, ?> condition);
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -715,6 +817,16 @@ public class AvroIO {
       return toBuilder().setEmptyMatchTreatment(treatment).build();
     }
 
+    /** Like {@link Read#watchForNewFiles}. */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public ParseAll<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition)
{
+      return toBuilder()
+          .setWatchForNewFilesInterval(pollInterval)
+          .setWatchForNewFilesTerminationCondition(terminationCondition)
+          .build();
+    }
+
     /** Specifies the coder for the result of the {@code parseFn}. */
     public ParseAll<T> withCoder(Coder<T> coder) {
       return toBuilder().setCoder(coder).build();
@@ -742,6 +854,11 @@ public class AvroIO {
           };
       Match.Filepatterns matchFilepatterns =
               Match.filepatterns().withEmptyMatchTreatment(emptyMatchTreatment);
+      if (getWatchForNewFilesInterval() != null) {
+        matchFilepatterns =
+                matchFilepatterns.continuously(
+                        getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+      }
       return input
           .apply(matchFilepatterns)
           .apply(
@@ -760,7 +877,10 @@ public class AvroIO {
           .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
           .add(
               DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"));
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+                  .withLabel("Interval to watch for new files"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f1f39871/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index d0aa02c..f49443d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -57,6 +57,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -73,8 +74,11 @@ import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -260,19 +264,84 @@ public class AvroIOTest {
                 .withNumShards(3));
     writePipeline.run().waitUntilFinish();
 
-    // Test read(), readAll(), and parseAllGenericRecords().
+    // Test readAll() and parseAllGenericRecords().
+    PCollection<String> paths =
+        readPipeline.apply(
+            "Create paths",
+            Create.of(
+                tmpFolder.getRoot().getAbsolutePath() + "/first*",
+                tmpFolder.getRoot().getAbsolutePath() + "/second*"));
+    PAssert.that(
+            paths.apply(
+                "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
+    PAssert.that(
+            paths.apply(
+                "Parse all",
+                AvroIO.parseAllGenericRecords(new ParseGenericClass())
+                    .withCoder(AvroCoder.of(GenericClass.class))
+                    .withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
+
+    readPipeline.run();
+  }
+
+  private static class CreateGenericClass extends SimpleFunction<Long, GenericClass>
{
+    @Override
+    public GenericClass apply(Long i) {
+      return new GenericClass(i.intValue(), "value" + i);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAvroIOContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable {
+    SimpleFunction<Long, GenericClass> mapFn = new CreateGenericClass();
+    List<GenericClass> firstValues = Lists.newArrayList();
+    List<GenericClass> secondValues = Lists.newArrayList();
+    for (int i = 0; i < 7; ++i) {
+      (i < 3 ? firstValues : secondValues).add(mapFn.apply((long) i));
+    }
+    writePipeline.apply(
+            "Sequence first",
+            GenerateSequence.from(0).to(3).withRate(1, Duration.millis(300)))
+        .apply("Map first", MapElements.via(mapFn))
+        .apply(
+            "Write first",
+            AvroIO.write(GenericClass.class)
+                .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
+                .withNumShards(2));
+    writePipeline.apply(
+            "Sequence second",
+            GenerateSequence.from(3).to(7).withRate(1, Duration.millis(300)))
+        .apply("Map second", MapElements.via(mapFn))
+        .apply(
+            "Write second",
+            AvroIO.write(GenericClass.class)
+                .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
+                .withNumShards(3));
+    PipelineResult writeRes = writePipeline.run();
+
+    // Test read(), readAll(), parse(), and parseAllGenericRecords() with watchForNewFiles().
     PAssert.that(
             readPipeline.apply(
-                "Read first",
+                "Read",
                 AvroIO.read(GenericClass.class)
-                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")))
+                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
+                    .watchForNewFiles(
+                        Duration.millis(100),
+                        Watch.Growth.<Void>afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
         .containsInAnyOrder(firstValues);
     PAssert.that(
             readPipeline.apply(
-                "Read second",
-                AvroIO.read(GenericClass.class)
-                    .from(tmpFolder.getRoot().getAbsolutePath() + "/second*")))
-        .containsInAnyOrder(secondValues);
+                "Parse",
+                AvroIO.parseGenericRecords(new ParseGenericClass())
+                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
+                    .watchForNewFiles(
+                        Duration.millis(100),
+                        Watch.Growth.<Void>afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
+        .containsInAnyOrder(firstValues);
+
     PCollection<String> paths =
         readPipeline.apply(
             "Create paths",
@@ -281,17 +350,26 @@ public class AvroIOTest {
                 tmpFolder.getRoot().getAbsolutePath() + "/second*"));
     PAssert.that(
             paths.apply(
-                "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+                "Read all",
+                AvroIO.readAll(GenericClass.class)
+                    .watchForNewFiles(
+                        Duration.millis(100),
+                        Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))
+                    .withDesiredBundleSizeBytes(10)))
         .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
     PAssert.that(
             paths.apply(
                 "Parse all",
                 AvroIO.parseAllGenericRecords(new ParseGenericClass())
                     .withCoder(AvroCoder.of(GenericClass.class))
+                    .watchForNewFiles(
+                        Duration.millis(100),
+                        Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))
                     .withDesiredBundleSizeBytes(10)))
         .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
 
     readPipeline.run();
+    writeRes.waitUntilFinish();
   }
 
   @Test


Mime
View raw message