beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Copy and use UnboundedReadFromBoundedSource in dataflow runner
Date Thu, 30 Jun 2016 05:45:57 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master e01efbda3 -> 12b6ff8d7


Copy and use UnboundedReadFromBoundedSource in dataflow runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f2ac394
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f2ac394
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f2ac394

Branch: refs/heads/master
Commit: 4f2ac394c207e0807042013fcab75296d8cf61df
Parents: e01efbd
Author: Pei He <peihe@google.com>
Authored: Mon Jun 27 17:29:14 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Jun 29 22:45:44 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/dataflow/DataflowRunner.java   |  34 +-
 .../DataflowUnboundedReadFromBoundedSource.java | 547 +++++++++++++++++++
 .../runners/dataflow/DataflowRunnerTest.java    |  30 -
 3 files changed, 577 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f2ac394/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 70dd94f..2ba6c7b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TransformTran
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.TranslationContext;
 import org.apache.beam.runners.dataflow.internal.AssignWindows;
 import org.apache.beam.runners.dataflow.internal.DataflowAggregatorTransforms;
+import org.apache.beam.runners.dataflow.internal.DataflowUnboundedReadFromBoundedSource;
 import org.apache.beam.runners.dataflow.internal.IsmFormat;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
 import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
@@ -60,6 +61,7 @@ import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.BigQueryIO;
+import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.PubsubIO;
 import org.apache.beam.sdk.io.PubsubUnboundedSink;
@@ -350,11 +352,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
       builder.put(View.AsIterable.class, StreamingViewAsIterable.class);
       builder.put(Write.Bound.class, StreamingWrite.class);
       builder.put(Read.Unbounded.class, StreamingUnboundedRead.class);
-      builder.put(Read.Bounded.class, UnsupportedIO.class);
-      builder.put(AvroIO.Read.Bound.class, UnsupportedIO.class);
+      builder.put(Read.Bounded.class, StreamingBoundedRead.class);
       builder.put(AvroIO.Write.Bound.class, UnsupportedIO.class);
-      builder.put(BigQueryIO.Read.Bound.class, UnsupportedIO.class);
-      builder.put(TextIO.Read.Bound.class, UnsupportedIO.class);
       builder.put(TextIO.Write.Bound.class, UnsupportedIO.class);
       builder.put(Window.Bound.class, AssignWindows.class);
       // In streaming mode must use either the custom Pubsub unbounded source/sink or
@@ -2366,6 +2365,33 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob>
{
   }
 
   /**
+   * Specialized implementation for {@link org.apache.beam.sdk.io.Read.Bounded Read.Bounded}
for the
+   * Dataflow runner in streaming mode.
+   */
+  private static class StreamingBoundedRead<T> extends PTransform<PInput, PCollection<T>>
{
+    private final BoundedSource<T> source;
+
+    /** Builds an instance of this class from the overridden transform. */
+    @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
+    public StreamingBoundedRead(DataflowRunner runner, Read.Bounded<T> transform) {
+      this.source = transform.getSource();
+    }
+
+    @Override
+    protected Coder<T> getDefaultOutputCoder() {
+      return source.getDefaultOutputCoder();
+    }
+
+    @Override
+    public final PCollection<T> apply(PInput input) {
+      source.validate();
+
+      return Pipeline.applyTransform(input, new DataflowUnboundedReadFromBoundedSource<>(source))
+          .setIsBoundedInternal(IsBounded.BOUNDED);
+    }
+  }
+
+  /**
    * Specialized implementation for
    * {@link org.apache.beam.sdk.transforms.Create.Values Create.Values} for the
    * Dataflow runner in streaming mode.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f2ac394/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
new file mode 100644
index 0000000..5e035bc
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowUnboundedReadFromBoundedSource.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.internal;
+
+import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link PTransform} that converts a {@link BoundedSource} as an {@link UnboundedSource}.
+ *
+ * <p>{@link BoundedSource} is read directly without calling {@link BoundedSource#splitIntoBundles},
+ * and element timestamps are propagated. While any elements remain, the watermark is the
beginning
+ * of time {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, and after all elements have been produced
+ * the watermark goes to the end of time {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ *
+ * <p>Checkpoints are created by calling {@link BoundedReader#splitAtFraction} on inner
+ * {@link BoundedSource}.
+ * Sources that cannot be split are read entirely into memory, so this transform does not
work well
+ * with large, unsplittable sources.
+ *
+ * <p>This transform is intended to be used by a runner during pipeline translation
to convert
+ * a Read.Bounded into a Read.Unbounded.
+ *
+ * @deprecated This class is copied from beam runners core in order to avoid pipeline construction
+ * time dependency. It should be replaced in the dataflow worker as an execution time dependency.
+ */
+@Deprecated
+public class DataflowUnboundedReadFromBoundedSource<T> extends PTransform<PInput,
PCollection<T>> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DataflowUnboundedReadFromBoundedSource.class);
+
+  private final BoundedSource<T> source;
+
+  /**
+   * Constructs a {@link PTransform} that performs an unbounded read from a {@link BoundedSource}.
+   */
+  public DataflowUnboundedReadFromBoundedSource(BoundedSource<T> source) {
+    this.source = source;
+  }
+
+  @Override
+  public PCollection<T> apply(PInput input) {
+    return input.getPipeline().apply(
+        Read.from(new BoundedToUnboundedSourceAdapter<>(source)));
+  }
+
+  @Override
+  protected Coder<T> getDefaultOutputCoder() {
+    return source.getDefaultOutputCoder();
+  }
+
+  @Override
+  public String getKindString() {
+    return "Read(" + approximateSimpleName(source.getClass()) + ")";
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    // We explicitly do not register base-class data, instead we use the delegate inner source.
+    builder
+        .add(DisplayData.item("source", source.getClass()))
+        .include(source);
+  }
+
+  /**
+   * A {@code BoundedSource} to {@code UnboundedSource} adapter.
+   */
+  @VisibleForTesting
+  public static class BoundedToUnboundedSourceAdapter<T>
+      extends UnboundedSource<T, BoundedToUnboundedSourceAdapter.Checkpoint<T>>
{
+
+    private BoundedSource<T> boundedSource;
+
+    public BoundedToUnboundedSourceAdapter(BoundedSource<T> boundedSource) {
+      this.boundedSource = boundedSource;
+    }
+
+    @Override
+    public void validate() {
+      boundedSource.validate();
+    }
+
+    @Override
+    public List<BoundedToUnboundedSourceAdapter<T>> generateInitialSplits(
+        int desiredNumSplits, PipelineOptions options) throws Exception {
+      try {
+        long desiredBundleSize = boundedSource.getEstimatedSizeBytes(options) / desiredNumSplits;
+        if (desiredBundleSize <= 0) {
+          LOG.warn("BoundedSource {} cannot estimate its size, skips the initial splits.",
+              boundedSource);
+          return ImmutableList.of(this);
+        }
+        List<? extends BoundedSource<T>> splits
+            = boundedSource.splitIntoBundles(desiredBundleSize, options);
+        if (splits == null) {
+          LOG.warn("BoundedSource cannot split {}, skips the initial splits.", boundedSource);
+          return ImmutableList.of(this);
+        }
+        return Lists.transform(
+            splits,
+            new Function<BoundedSource<T>, BoundedToUnboundedSourceAdapter<T>>()
{
+              @Override
+              public BoundedToUnboundedSourceAdapter<T> apply(BoundedSource<T>
input) {
+                return new BoundedToUnboundedSourceAdapter<>(input);
+              }});
+      } catch (Exception e) {
+        LOG.warn("Exception while splitting {}, skips the initial splits.", boundedSource,
e);
+        return ImmutableList.of(this);
+      }
+    }
+
+    @Override
+    public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint)
+        throws IOException {
+      if (checkpoint == null) {
+        return new Reader(null /* residualElements */, boundedSource, options);
+      } else {
+        return new Reader(checkpoint.residualElements, checkpoint.residualSource, options);
+      }
+    }
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return boundedSource.getDefaultOutputCoder();
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Override
+    public Coder<Checkpoint<T>> getCheckpointMarkCoder() {
+      return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
+    }
+
+    @VisibleForTesting
+    static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+      private final @Nullable List<TimestampedValue<T>> residualElements;
+      private final @Nullable BoundedSource<T> residualSource;
+
+      public Checkpoint(
+          @Nullable List<TimestampedValue<T>> residualElements,
+          @Nullable BoundedSource<T> residualSource) {
+        this.residualElements = residualElements;
+        this.residualSource = residualSource;
+      }
+
+      @Override
+      public void finalizeCheckpoint() {}
+
+      @VisibleForTesting
+      @Nullable List<TimestampedValue<T>> getResidualElements() {
+        return residualElements;
+      }
+
+      @VisibleForTesting
+      @Nullable BoundedSource<T> getResidualSource() {
+        return residualSource;
+      }
+    }
+
+    @VisibleForTesting
+    static class CheckpointCoder<T> extends StandardCoder<Checkpoint<T>>
{
+
+      @JsonCreator
+      public static CheckpointCoder<?> of(
+          @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+          List<Coder<?>> components) {
+        checkArgument(components.size() == 1,
+            "Expecting 1 components, got %s", components.size());
+        return new CheckpointCoder<>(components.get(0));
+      }
+
+      // The coder for a list of residual elements and their timestamps
+      private final Coder<List<TimestampedValue<T>>> elemsCoder;
+      // The coder from the BoundedReader for coding each element
+      private final Coder<T> elemCoder;
+      // The nullable and serializable coder for the BoundedSource.
+      @SuppressWarnings("rawtypes")
+      private final Coder<BoundedSource> sourceCoder;
+
+      CheckpointCoder(Coder<T> elemCoder) {
+        this.elemsCoder = NullableCoder.of(
+            ListCoder.of(TimestampedValue.TimestampedValueCoder.of(elemCoder)));
+        this.elemCoder = elemCoder;
+        this.sourceCoder = NullableCoder.of(SerializableCoder.of(BoundedSource.class));
+      }
+
+      @Override
+      public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
+          throws CoderException, IOException {
+        Context nested = context.nested();
+        elemsCoder.encode(value.residualElements, outStream, nested);
+        sourceCoder.encode(value.residualSource, outStream, nested);
+      }
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public Checkpoint<T> decode(InputStream inStream, Context context)
+          throws CoderException, IOException {
+        Context nested = context.nested();
+        return new Checkpoint<>(
+            elemsCoder.decode(inStream, nested),
+            sourceCoder.decode(inStream, nested));
+      }
+
+      @Override
+      public List<Coder<?>> getCoderArguments() {
+        return Arrays.<Coder<?>>asList(elemCoder);
+      }
+
+      @Override
+      public void verifyDeterministic() throws NonDeterministicException {
+        throw new NonDeterministicException(this,
+            "CheckpointCoder uses Java Serialization, which may be non-deterministic.");
+      }
+    }
+
+    /**
+     * An {@code UnboundedReader<T>} that wraps a {@code BoundedSource<T>} into
+     * {@link ResidualElements} and {@link ResidualSource}.
+     *
+     * <p>In the initial state, {@link ResidualElements} is null and {@link ResidualSource}
contains
+     * the {@code BoundedSource<T>}. After the first checkpoint, the {@code BoundedSource<T>}
will
+     * be split into {@link ResidualElements} and {@link ResidualSource}.
+     */
+    @VisibleForTesting
+    class Reader extends UnboundedReader<T> {
+      private ResidualElements residualElements;
+      private @Nullable ResidualSource residualSource;
+      private final PipelineOptions options;
+      private boolean done;
+
+      Reader(
+          @Nullable List<TimestampedValue<T>> residualElementsList,
+          @Nullable BoundedSource<T> residualSource,
+          PipelineOptions options) {
+        init(residualElementsList, residualSource, options);
+        this.options = checkNotNull(options, "options");
+        this.done = false;
+      }
+
+      private void init(
+          @Nullable List<TimestampedValue<T>> residualElementsList,
+          @Nullable BoundedSource<T> residualSource,
+          PipelineOptions options) {
+        this.residualElements = residualElementsList == null
+            ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList())
+                : new ResidualElements(residualElementsList);
+        this.residualSource =
+            residualSource == null ? null : new ResidualSource(residualSource, options);
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return advance();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        if (residualElements.advance()) {
+          return true;
+        } else if (residualSource != null && residualSource.advance()) {
+          return true;
+        } else {
+          done = true;
+          return false;
+        }
+      }
+
+      @Override
+      public void close() throws IOException {
+        if (residualSource != null) {
+          residualSource.close();
+        }
+      }
+
+      @Override
+      public T getCurrent() throws NoSuchElementException {
+        if (residualElements.hasCurrent()) {
+          return residualElements.getCurrent();
+        } else if (residualSource != null) {
+          return residualSource.getCurrent();
+        } else {
+          throw new NoSuchElementException();
+        }
+      }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        if (residualElements.hasCurrent()) {
+          return residualElements.getCurrentTimestamp();
+        } else if (residualSource != null) {
+          return residualSource.getCurrentTimestamp();
+        } else {
+          throw new NoSuchElementException();
+        }
+      }
+
+      @Override
+      public Instant getWatermark() {
+        return done ? BoundedWindow.TIMESTAMP_MAX_VALUE : BoundedWindow.TIMESTAMP_MIN_VALUE;
+      }
+
+      /**
+       * {@inheritDoc}
+       *
+       * <p>If only part of the {@link ResidualElements} is consumed, the new
+       * checkpoint will contain the remaining elements in {@link ResidualElements} and
+       * the {@link ResidualSource}.
+       *
+       * <p>If all {@link ResidualElements} and part of the
+       * {@link ResidualSource} are consumed, the new checkpoint is done by splitting
+       * {@link ResidualSource} into new {@link ResidualElements} and {@link ResidualSource}.
+       * {@link ResidualSource} is the source split from the current source,
+       * and {@link ResidualElements} contains rest elements from the current source after
+       * the splitting. For unsplittable source, it will put all remaining elements into
+       * the {@link ResidualElements}.
+       */
+      @Override
+      public Checkpoint<T> getCheckpointMark() {
+        Checkpoint<T> newCheckpoint;
+        if (!residualElements.done()) {
+          // Part of residualElements are consumed.
+          // Checkpoints the remaining elements and residualSource.
+          newCheckpoint = new Checkpoint<>(
+              residualElements.getRestElements(),
+              residualSource == null ? null : residualSource.getSource());
+        } else if (residualSource != null) {
+          newCheckpoint = residualSource.getCheckpointMark();
+        } else {
+          newCheckpoint = new Checkpoint<>(null /* residualElements */, null /* residualSource
*/);
+        }
+        // Re-initialize since the residualElements and the residualSource might be
+        // consumed or split by checkpointing.
+        init(newCheckpoint.residualElements, newCheckpoint.residualSource, options);
+        return newCheckpoint;
+      }
+
+      @Override
+      public BoundedToUnboundedSourceAdapter<T> getCurrentSource() {
+        return BoundedToUnboundedSourceAdapter.this;
+      }
+    }
+
+    private class ResidualElements {
+      private final List<TimestampedValue<T>> elementsList;
+      private @Nullable Iterator<TimestampedValue<T>> elementsIterator;
+      private @Nullable TimestampedValue<T> currentT;
+      private boolean hasCurrent;
+      private boolean done;
+
+      ResidualElements(List<TimestampedValue<T>> residualElementsList) {
+        this.elementsList = checkNotNull(residualElementsList, "residualElementsList");
+        this.elementsIterator = null;
+        this.currentT = null;
+        this.hasCurrent = false;
+        this.done = false;
+      }
+
+      public boolean advance() {
+        if (elementsIterator == null) {
+          elementsIterator = elementsList.iterator();
+        }
+        if (elementsIterator.hasNext()) {
+          currentT = elementsIterator.next();
+          hasCurrent = true;
+          return true;
+        } else {
+          done = true;
+          hasCurrent = false;
+          return false;
+        }
+      }
+
+      boolean hasCurrent() {
+        return hasCurrent;
+      }
+
+      boolean done() {
+        return done;
+      }
+
+      TimestampedValue<T> getCurrentTimestampedValue() {
+        if (!hasCurrent) {
+          throw new NoSuchElementException();
+        }
+        return currentT;
+      }
+
+      T getCurrent() {
+        return getCurrentTimestampedValue().getValue();
+      }
+
+      Instant getCurrentTimestamp() {
+        return getCurrentTimestampedValue().getTimestamp();
+      }
+
+      List<TimestampedValue<T>> getRestElements() {
+        if (elementsIterator == null) {
+          return elementsList;
+        } else {
+          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+          while (elementsIterator.hasNext()) {
+            newResidualElements.add(elementsIterator.next());
+          }
+          return newResidualElements;
+        }
+      }
+    }
+
+    private class ResidualSource {
+      private BoundedSource<T> residualSource;
+      private PipelineOptions options;
+      private @Nullable BoundedReader<T> reader;
+      private boolean closed;
+
+      public ResidualSource(BoundedSource<T> residualSource, PipelineOptions options)
{
+        this.residualSource = checkNotNull(residualSource, "residualSource");
+        this.options = checkNotNull(options, "options");
+        this.reader = null;
+        this.closed = false;
+      }
+
+      private boolean advance() throws IOException {
+        if (reader == null && !closed) {
+          reader = residualSource.createReader(options);
+          return reader.start();
+        } else {
+          return reader.advance();
+        }
+      }
+
+      T getCurrent() throws NoSuchElementException {
+        if (reader == null) {
+          throw new NoSuchElementException();
+        }
+        return reader.getCurrent();
+      }
+
+      Instant getCurrentTimestamp() throws NoSuchElementException {
+        if (reader == null) {
+          throw new NoSuchElementException();
+        }
+        return reader.getCurrentTimestamp();
+      }
+
+      void close() throws IOException {
+        if (reader != null) {
+          reader.close();
+          reader = null;
+        }
+        closed = true;
+      }
+
+      BoundedSource<T> getSource() {
+        return residualSource;
+      }
+
+      Checkpoint<T> getCheckpointMark() {
+        if (reader == null) {
+          // Reader hasn't started, checkpoint the residualSource.
+          return new Checkpoint<>(null /* residualElements */, residualSource);
+        } else {
+          // Part of residualSource are consumed.
+          // Splits the residualSource and tracks the new residualElements in current source.
+          BoundedSource<T> residualSplit = null;
+          Double fractionConsumed = reader.getFractionConsumed();
+          if (fractionConsumed != null && 0 <= fractionConsumed && fractionConsumed
<= 1) {
+            double fractionRest = 1 - fractionConsumed;
+            int splitAttempts = 8;
+            for (int i = 0; i < 8 && residualSplit == null; ++i) {
+              double fractionToSplit = fractionConsumed + fractionRest * i / splitAttempts;
+              residualSplit = reader.splitAtFraction(fractionToSplit);
+            }
+          }
+          List<TimestampedValue<T>> newResidualElements = Lists.newArrayList();
+          try {
+            while (advance()) {
+              newResidualElements.add(
+                  TimestampedValue.of(reader.getCurrent(), reader.getCurrentTimestamp()));
+            }
+          } catch (IOException e) {
+            throw new RuntimeException("Failed to read elements from the bounded reader.",
e);
+          }
+          return new Checkpoint<>(newResidualElements, residualSplit);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f2ac394/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 999dc3a..0cf1ade 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -56,8 +56,6 @@ import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.AvroSource;
-import org.apache.beam.sdk.io.BigQueryIO;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -898,34 +896,6 @@ public class DataflowRunnerTest {
   }
 
   @Test
-  public void testBoundedSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(
-        AvroSource.readFromFileWithClass("foo", String.class), "Read.Bounded", true);
-  }
-
-  @Test
-  public void testBigQueryIOSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(
-        BigQueryIO.Read.from("project:bar.baz").withoutValidation(), "BigQueryIO.Read", true);
-  }
-
-  @Test
-  public void testAvroIOSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(
-        AvroIO.Read.from("foo"), "AvroIO.Read", true);
-  }
-
-  @Test
-  public void testTextIOSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(TextIO.Read.from("foo"), "TextIO.Read", true);
-  }
-
-  @Test
-  public void testReadBoundedSourceUnsupportedInStreaming() throws Exception {
-    testUnsupportedSource(Read.from(AvroSource.from("/tmp/test")), "Read.Bounded", true);
-  }
-
-  @Test
   public void testReadUnboundedUnsupportedInBatch() throws Exception {
     testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
   }


Mime
View raw message