beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [41/67] [partial] incubator-beam git commit: Directory reorganization
Date Thu, 24 Mar 2016 02:48:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
deleted file mode 100644
index cde8769..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.cloud.dataflow.sdk.util.StringUtils.approximateSimpleName;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PInput;
-
-import org.joda.time.Duration;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link PTransform} for reading from a {@link Source}.
- *
- * <p>Usage example:
- * <pre>
- * Pipeline p = Pipeline.create();
- * p.apply(Read.from(new MySource().withFoo("foo").withBar("bar"))
- *             .named("foobar"));
- * </pre>
- */
-public class Read {
-  /**
-   * Returns a new {@code Read} {@code PTransform} builder with the given name.
-   */
-  public static Builder named(String name) {
-    return new Builder(name);
-  }
-
-  /**
-   * Returns a new {@code Read.Bounded} {@code PTransform} reading from the given
-   * {@code BoundedSource}.
-   */
-  public static <T> Bounded<T> from(BoundedSource<T> source) {
-    return new Bounded<>(null, source);
-  }
-
-  /**
-   * Returns a new {@code Read.Unbounded} {@code PTransform} reading from the given
-   * {@code UnboundedSource}.
-   */
-  public static <T> Unbounded<T> from(UnboundedSource<T, ?> source) {
-    return new Unbounded<>(null, source);
-  }
-
-  /**
-   * Helper class for building {@code Read} transforms.
-   */
-  public static class Builder {
-    private final String name;
-
-    private Builder(String name) {
-      this.name = name;
-    }
-
-    /**
-     * Returns a new {@code Read.Bounded} {@code PTransform} reading from the given
-     * {@code BoundedSource}.
-     */
-    public <T> Bounded<T> from(BoundedSource<T> source) {
-      return new Bounded<>(name, source);
-    }
-
-    /**
-     * Returns a new {@code Read.Unbounded} {@code PTransform} reading from the given
-     * {@code UnboundedSource}.
-     */
-    public <T> Unbounded<T> from(UnboundedSource<T, ?> source) {
-      return new Unbounded<>(name, source);
-    }
-  }
-
-  /**
-   * {@link PTransform} that reads from a {@link BoundedSource}.
-   */
-  public static class Bounded<T> extends PTransform<PInput, PCollection<T>> {
-    private final BoundedSource<T> source;
-
-    private Bounded(@Nullable String name, BoundedSource<T> source) {
-      super(name);
-      this.source = SerializableUtils.ensureSerializable(source);
-    }
-
-    /**
-     * Returns a new {@code Bounded} {@code PTransform} that's like this one but
-     * has the given name.
-     *
-     * <p>Does not modify this object.
-     */
-    public Bounded<T> named(String name) {
-      return new Bounded<T>(name, source);
-    }
-
-    @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
-    }
-
-    @Override
-    public final PCollection<T> apply(PInput input) {
-      source.validate();
-
-      return PCollection.<T>createPrimitiveOutputInternal(input.getPipeline(),
-          WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
-          .setCoder(getDefaultOutputCoder());
-    }
-
-    /**
-     * Returns the {@code BoundedSource} used to create this {@code Read} {@code PTransform}.
-     */
-    public BoundedSource<T> getSource() {
-      return source;
-    }
-
-    @Override
-    public String getKindString() {
-      return "Read(" + approximateSimpleName(source.getClass()) + ")";
-    }
-
-    static {
-      registerDefaultTransformEvaluator();
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private static void registerDefaultTransformEvaluator() {
-      DirectPipelineRunner.registerDefaultTransformEvaluator(
-          Bounded.class,
-          new DirectPipelineRunner.TransformEvaluator<Bounded>() {
-            @Override
-            public void evaluate(
-                Bounded transform, DirectPipelineRunner.EvaluationContext context) {
-              evaluateReadHelper(transform, context);
-            }
-
-            private <T> void evaluateReadHelper(
-                Read.Bounded<T> transform, DirectPipelineRunner.EvaluationContext context) {
-              try {
-                List<DirectPipelineRunner.ValueWithMetadata<T>> output = new ArrayList<>();
-                BoundedSource<T> source = transform.getSource();
-                try (BoundedSource.BoundedReader<T> reader =
-                    source.createReader(context.getPipelineOptions())) {
-                  for (boolean available = reader.start();
-                      available;
-                      available = reader.advance()) {
-                    output.add(
-                        DirectPipelineRunner.ValueWithMetadata.of(
-                            WindowedValue.timestampedValueInGlobalWindow(
-                                reader.getCurrent(), reader.getCurrentTimestamp())));
-                  }
-                }
-                context.setPCollectionValuesWithMetadata(context.getOutput(transform), output);
-              } catch (Exception e) {
-                throw new RuntimeException(e);
-              }
-            }
-          });
-    }
-  }
-
-  /**
-   * {@link PTransform} that reads from a {@link UnboundedSource}.
-   */
-  public static class Unbounded<T> extends PTransform<PInput, PCollection<T>> {
-    private final UnboundedSource<T, ?> source;
-
-    private Unbounded(@Nullable String name, UnboundedSource<T, ?> source) {
-      super(name);
-      this.source = SerializableUtils.ensureSerializable(source);
-    }
-
-    /**
-     * Returns a new {@code Unbounded} {@code PTransform} that's like this one but
-     * has the given name.
-     *
-     * <p>Does not modify this object.
-     */
-    public Unbounded<T> named(String name) {
-      return new Unbounded<T>(name, source);
-    }
-
-    /**
-     * Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount
-     * of data from the given {@link UnboundedSource}.  The bound is specified as a number
-     * of records to read.
-     *
-     * <p>This may take a long time to execute if the splits of this source are slow to read
-     * records.
-     */
-    public BoundedReadFromUnboundedSource<T> withMaxNumRecords(long maxNumRecords) {
-      return new BoundedReadFromUnboundedSource<T>(source, maxNumRecords, null);
-    }
-
-    /**
-     * Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount
-     * of data from the given {@link UnboundedSource}.  The bound is specified as an amount
-     * of time to read for.  Each split of the source will read for this much time.
-     */
-    public BoundedReadFromUnboundedSource<T> withMaxReadTime(Duration maxReadTime) {
-      return new BoundedReadFromUnboundedSource<T>(source, Long.MAX_VALUE, maxReadTime);
-    }
-
-    @Override
-    protected Coder<T> getDefaultOutputCoder() {
-      return source.getDefaultOutputCoder();
-    }
-
-    @Override
-    public final PCollection<T> apply(PInput input) {
-      source.validate();
-
-      return PCollection.<T>createPrimitiveOutputInternal(
-          input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED);
-    }
-
-    /**
-     * Returns the {@code UnboundedSource} used to create this {@code Read} {@code PTransform}.
-     */
-    public UnboundedSource<T, ?> getSource() {
-      return source;
-    }
-
-    @Override
-    public String getKindString() {
-      return "Read(" + approximateSimpleName(source.getClass()) + ")";
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java
deleted file mode 100644
index 7270012..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/ShardNameTemplate.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-/**
- * Standard shard naming templates.
- *
- * <p>Shard naming templates are strings that may contain placeholders for
- * the shard number and shard count.  When constructing a filename for a
- * particular shard number, the upper-case letters 'S' and 'N' are replaced
- * with the 0-padded shard number and shard count respectively.
- *
- * <p>Left-padding of the numbers enables lexicographical sorting of the
- * resulting filenames.  If the shard number or count are too large for the
- * space provided in the template, then the result may no longer sort
- * lexicographically.  For example, a shard template of "S-of-N", for 200
- * shards, will result in outputs named "0-of-200", ... '10-of-200',
- * '100-of-200", etc.
- *
- * <p>Shard numbers start with 0, so the last shard number is the shard count
- * minus one.  For example, the template "-SSSSS-of-NNNNN" will be
- * instantiated as "-00000-of-01000" for the first shard (shard 0) of a
- * 1000-way sharded output.
- *
- * <p>A shard name template is typically provided along with a name prefix
- * and suffix, which allows constructing complex paths that have embedded
- * shard information.  For example, outputs in the form
- * "gs://bucket/path-01-of-99.txt" could be constructed by providing the
- * individual components:
- *
- * <pre>{@code
- *   pipeline.apply(
- *       TextIO.Write.to("gs://bucket/path")
- *                   .withShardNameTemplate("-SS-of-NN")
- *                   .withSuffix(".txt"))
- * }</pre>
- *
- * <p>In the example above, you could make parts of the output configurable
- * by users without the user having to specify all components of the output
- * name.
- *
- * <p>If a shard name template does not contain any repeating 'S', then
- * the output shard count must be 1, as otherwise the same filename would be
- * generated for multiple shards.
- */
-public class ShardNameTemplate {
-  /**
-   * Shard name containing the index and max.
-   *
-   * <p>Eg: [prefix]-00000-of-00100[suffix] and
-   * [prefix]-00001-of-00100[suffix]
-   */
-  public static final String INDEX_OF_MAX = "-SSSSS-of-NNNNN";
-
-  /**
-   * Shard is a file within a directory.
-   *
-   * <p>Eg: [prefix]/part-00000[suffix] and [prefix]/part-00001[suffix]
-   */
-  public static final String DIRECTORY_CONTAINER = "/part-SSSSS";
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java
deleted file mode 100644
index a5649ce..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-import java.io.Serializable;
-
-/**
- * A {@code Sink} represents a resource that can be written to using the {@link Write} transform.
- *
- * <p>A parallel write to a {@code Sink} consists of three phases:
- * <ol>
- * <li>A sequential <i>initialization</i> phase (e.g., creating a temporary output directory, etc.)
- * <li>A <i>parallel write</i> phase where workers write bundles of records
- * <li>A sequential <i>finalization</i> phase (e.g., committing the writes, merging output files,
- * etc.)
- * </ol>
- *
- * <p>The {@link Write} transform can be used in a Dataflow pipeline to perform this write.
- * Specifically, a Write transform can be applied to a {@link PCollection} {@code p} by:
- *
- * <p>{@code p.apply(Write.to(new MySink()));}
- *
- * <p>Implementing a {@link Sink} and the corresponding write operations requires extending three
- * abstract classes:
- *
- * <ul>
- * <li>{@link Sink}: an immutable logical description of the location/resource to write to.
- * Depending on the type of sink, it may contain fields such as the path to an output directory
- * on a filesystem, a database table name, etc. Implementors of {@link Sink} must
- * implement two methods: {@link Sink#validate} and {@link Sink#createWriteOperation}.
- * {@link Sink#validate Validate} is called by the Write transform at pipeline creation, and should
- * validate that the Sink can be written to. The createWriteOperation method is also called at
- * pipeline creation, and should return a WriteOperation object that defines how to write to the
- * Sink. Note that implementations of Sink must be serializable and Sinks must be immutable.
- *
- * <li>{@link WriteOperation}: The WriteOperation implements the <i>initialization</i> and
- * <i>finalization</i> phases of a write. Implementors of {@link WriteOperation} must implement
- * corresponding {@link WriteOperation#initialize} and {@link WriteOperation#finalize} methods. A
- * WriteOperation must also implement {@link WriteOperation#createWriter} that creates Writers,
- * {@link WriteOperation#getWriterResultCoder} that returns a {@link Coder} for the result of a
- * parallel write, and a {@link WriteOperation#getSink} that returns the Sink that the write
- * operation corresponds to. See below for more information about these methods and restrictions on
- * their implementation.
- *
- * <li>{@link Writer}: A Writer writes a bundle of records. Writer defines four methods:
- * {@link Writer#open}, which is called once at the start of writing a bundle; {@link Writer#write},
- * which writes a single record from the bundle; {@link Writer#close}, which is called once at the
- * end of writing a bundle; and {@link Writer#getWriteOperation}, which returns the write operation
- * that the writer belongs to.
- * </ul>
- *
- * <h2>WriteOperation</h2>
- * <p>{@link WriteOperation#initialize} and {@link WriteOperation#finalize} are conceptually called
- * once: at the beginning and end of a Write transform. However, implementors must ensure that these
- * methods are idempotent, as they may be called multiple times on different machines in the case of
- * failure/retry or for redundancy.
- *
- * <p>The finalize method of WriteOperation is passed an Iterable of a writer result type. This
- * writer result type should encode the result of a write and, in most cases, some encoding of the
- * unique bundle id.
- *
- * <p>All implementations of {@link WriteOperation} must be serializable.
- *
- * <p>WriteOperation may have mutable state. For instance, {@link WriteOperation#initialize} may
- * mutate the object state. These mutations will be visible in {@link WriteOperation#createWriter}
- * and {@link WriteOperation#finalize} because the object will be serialized after initialize and
- * deserialized before these calls. However, it is not serialized again after createWriter is
- * called, as createWriter will be called within workers to create Writers for the bundles that are
- * distributed to these workers. Therefore, newWriter should not mutate the WriteOperation state (as
- * these mutations will not be visible in finalize).
- *
- * <h2>Bundle Ids:</h2>
- * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g., in the
- * event of failure/retry or for redundancy). However, exactly one of these executions will have its
- * result passed to the WriteOperation's finalize method. Each call to {@link Writer#open} is passed
- * a unique <i>bundle id</i> when it is called by the Write transform, so even redundant or retried
- * bundles will have a unique way of identifying their output.
- *
- * <p>The bundle id should be used to guarantee that a bundle's output is unique. This uniqueness
- * guarantee is important; if a bundle is to be output to a file, for example, the name of the file
- * must be unique to avoid conflicts with other Writers. The bundle id should be encoded in the
- * writer result returned by the Writer and subsequently used by the WriteOperation's finalize
- * method to identify the results of successful writes.
- *
- * <p>For example, consider the scenario where a Writer writes files containing serialized records
- * and the WriteOperation's finalization step is to merge or rename these output files. In this
- * case, a Writer may use its unique id to name its output file (to avoid conflicts) and return the
- * name of the file it wrote as its writer result. The WriteOperation will then receive an Iterable
- * of output file names that it can then merge or rename using some bundle naming scheme.
- *
- * <h2>Writer Results:</h2>
- * <p>{@link WriteOperation}s and {@link Writer}s must agree on a writer result type that will be
- * returned by a Writer after it writes a bundle. This type can be a client-defined object or an
- * existing type; {@link WriteOperation#getWriterResultCoder} should return a {@link Coder} for the
- * type.
- *
- * <p>A note about thread safety: Any use of static members or methods in Writer should be thread
- * safe, as different instances of Writer objects may be created in different threads on the same
- * worker.
- *
- * @param <T> the type that will be written to the Sink.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public abstract class Sink<T> implements Serializable {
-  /**
-   * Ensures that the sink is valid and can be written to before the write operation begins. One
-   * should use {@link com.google.common.base.Preconditions} to implement this method.
-   */
-  public abstract void validate(PipelineOptions options);
-
-  /**
-   * Returns an instance of a {@link WriteOperation} that can write to this Sink.
-   */
-  public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);
-
-  /**
-   * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
-   *
-   * <p>The {@code WriteOperation} defines how to perform initialization and finalization of a
-   * parallel write to a sink as well as how to create a {@link Sink.Writer} object that can write
-   * a bundle to the sink.
-   *
-   * <p>Since operations in Dataflow may be run multiple times for redundancy or fault-tolerance,
-   * the initialization and finalization defined by a WriteOperation <b>must be idempotent</b>.
-   *
-   * <p>{@code WriteOperation}s may be mutable; a {@code WriteOperation} is serialized after the
-   * call to {@code initialize} method and deserialized before calls to
-   * {@code createWriter} and {@code finalized}. However, it is not
-   * reserialized after {@code createWriter}, so {@code createWriter} should not mutate the
-   * state of the {@code WriteOperation}.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
-   *
-   * @param <T> The type of objects to write
-   * @param <WriteT> The result of a per-bundle write
-   */
-  public abstract static class WriteOperation<T, WriteT> implements Serializable {
-    /**
-     * Performs initialization before writing to the sink. Called before writing begins.
-     */
-    public abstract void initialize(PipelineOptions options) throws Exception;
-
-    /**
-     * Given an Iterable of results from bundle writes, performs finalization after writing and
-     * closes the sink. Called after all bundle writes are complete.
-     *
-     * <p>The results that are passed to finalize are those returned by bundles that completed
-     * successfully. Although bundles may have been run multiple times (for fault-tolerance), only
-     * one writer result will be passed to finalize for each bundle. An implementation of finalize
-     * should perform clean up of any failed and successfully retried bundles.  Note that these
-     * failed bundles will not have their writer result passed to finalize, so finalize should be
-     * capable of locating any temporary/partial output written by failed bundles.
-     *
-     * <p>A best practice is to make finalize atomic. If this is impossible given the semantics
-     * of the sink, finalize should be idempotent, as it may be called multiple times in the case of
-     * failure/retry or for redundancy.
-     *
-     * <p>Note that the iteration order of the writer results is not guaranteed to be consistent if
-     * finalize is called multiple times.
-     *
-     * @param writerResults an Iterable of results from successful bundle writes.
-     */
-    public abstract void finalize(Iterable<WriteT> writerResults, PipelineOptions options)
-        throws Exception;
-
-    /**
-     * Creates a new {@link Sink.Writer} to write a bundle of the input to the sink.
-     *
-     * <p>The bundle id that the writer will use to uniquely identify its output will be passed to
-     * {@link Writer#open}.
-     *
-     * <p>Must not mutate the state of the WriteOperation.
-     */
-    public abstract Writer<T, WriteT> createWriter(PipelineOptions options) throws Exception;
-
-    /**
-     * Returns the Sink that this write operation writes to.
-     */
-    public abstract Sink<T> getSink();
-
-    /**
-     * Returns a coder for the writer result type.
-     */
-    public Coder<WriteT> getWriterResultCoder() {
-      return null;
-    }
-  }
-
-  /**
-   * A Writer writes a bundle of elements from a PCollection to a sink. {@link Writer#open} is
-   * called before writing begins and {@link Writer#close} is called after all elements in the
-   * bundle have been written. {@link Writer#write} writes an element to the sink.
-   *
-   * <p>Note that any access to static members or methods of a Writer must be thread-safe, as
-   * multiple instances of a Writer may be instantiated in different threads on the same worker.
-   *
-   * <p>See {@link Sink} for more detailed documentation about the process of writing to a Sink.
-   *
-   * @param <T> The type of object to write
-   * @param <WriteT> The writer results type (e.g., the bundle's output filename, as String)
-   */
-  public abstract static class Writer<T, WriteT> {
-    /**
-     * Performs bundle initialization. For example, creates a temporary file for writing or
-     * initializes any state that will be used across calls to {@link Writer#write}.
-     *
-     * <p>The unique id that is given to open should be used to ensure that the writer's output does
-     * not interfere with the output of other Writers, as a bundle may be executed many times for
-     * fault tolerance. See {@link Sink} for more information about bundle ids.
-     */
-    public abstract void open(String uId) throws Exception;
-
-    /**
-     * Called for each value in the bundle.
-     */
-    public abstract void write(T value) throws Exception;
-
-    /**
-     * Finishes writing the bundle. Closes any resources used for writing the bundle.
-     *
-     * <p>Returns a writer result that will be used in the {@link Sink.WriteOperation}'s
-     * finalization. The result should contain some way to identify the output of this bundle (using
-     * the bundle id). {@link WriteOperation#finalize} will use the writer result to identify
-     * successful writes. See {@link Sink} for more information about bundle ids.
-     *
-     * @return the writer result
-     */
-    public abstract WriteT close() throws Exception;
-
-    /**
-     * Returns the write operation this writer belongs to.
-     */
-    public abstract WriteOperation<T, WriteT> getWriteOperation();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
deleted file mode 100644
index 4a02078..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.NoSuchElementException;
-
-/**
- * Base class for defining input formats and creating a {@code Source} for reading the input.
- *
- * <p>This class is not intended to be subclassed directly. Instead, to define
- * a bounded source (a source which produces a finite amount of input), subclass
- * {@link BoundedSource}; to define an unbounded source, subclass {@link UnboundedSource}.
- *
- * <p>A {@code Source} passed to a {@code Read} transform must be
- * {@code Serializable}.  This allows the {@code Source} instance
- * created in this "main program" to be sent (in serialized form) to
- * remote worker machines and reconstituted for each batch of elements
- * of the input {@code PCollection} being processed or for each source splitting
- * operation. A {@code Source} can have instance variable state, and
- * non-transient instance variable state will be serialized in the main program
- * and then deserialized on remote worker machines.
- *
- * <p>{@code Source} classes MUST be effectively immutable. The only acceptable use of
- * mutable fields is to cache the results of expensive operations, and such fields MUST be
- * marked {@code transient}.
- *
- * <p>{@code Source} objects should override {@link Object#toString}, as it will be
- * used in important error and debugging messages.
- *
- * @param <T> Type of elements read by the source.
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public abstract class Source<T> implements Serializable {
-  /**
-   * Checks that this source is valid, before it can be used in a pipeline.
-   *
-   * <p>It is recommended to use {@link com.google.common.base.Preconditions} for implementing
-   * this method.
-   */
-  public abstract void validate();
-
-  /**
-   * Returns the default {@code Coder} to use for the data read from this source.
-   */
-  public abstract Coder<T> getDefaultOutputCoder();
-
-  /**
-   * The interface that readers of custom input sources must implement.
-   *
-   * <p>This interface is deliberately distinct from {@link java.util.Iterator} because
-   * the current model tends to be easier to program and more efficient in practice
-   * for iterating over sources such as files, databases etc. (rather than pure collections).
-   *
-   * <p>Reading data from the {@link Reader} must obey the following access pattern:
-   * <ul>
-   * <li> One call to {@link #start}
-   * <ul><li>If {@link #start} returned true, any number of calls to {@code getCurrent}*
-   *   methods</ul>
-   * <li> Repeatedly, a call to {@link #advance}. This may be called regardless
-   *   of what the previous {@link #start}/{@link #advance} returned.
-   * <ul><li>If {@link #advance} returned true, any number of calls to {@code getCurrent}*
-   *   methods</ul>
-   * </ul>
-   *
-   * <p>For example, if the reader is reading a fixed set of data:
-   * <pre>
-   *   try {
-   *     for (boolean available = reader.start(); available; available = reader.advance()) {
-   *       T item = reader.getCurrent();
-   *       Instant timestamp = reader.getCurrentTimestamp();
-   *       ...
-   *     }
-   *   } finally {
-   *     reader.close();
-   *   }
-   * </pre>
-   *
-   * <p>If the set of data being read is continually growing:
-   * <pre>
-   *   try {
-   *     boolean available = reader.start();
-   *     while (true) {
-   *       if (available) {
-   *         T item = reader.getCurrent();
-   *         Instant timestamp = reader.getCurrentTimestamp();
-   *         ...
-   *         resetExponentialBackoff();
-   *       } else {
-   *         exponentialBackoff();
-   *       }
-   *       available = reader.advance();
-   *     }
-   *   } finally {
-   *     reader.close();
-   *   }
-   * </pre>
-   *
-   * <p>Note: this interface is a work-in-progress and may change.
-   *
-   * <p>All {@code Reader} functions except {@link #getCurrentSource} do not need to be thread-safe;
-   * they may only be accessed by a single thread at once. However, {@link #getCurrentSource} needs
-   * to be thread-safe, and other functions should assume that its returned value can change
-   * asynchronously.
-   */
-  public abstract static class Reader<T> implements AutoCloseable {
-    /**
-     * Initializes the reader and advances the reader to the first record.
-     *
-     * <p>This method should be called exactly once. The invocation should occur prior to calling
-     * {@link #advance} or {@link #getCurrent}. This method may perform expensive operations that
-     * are needed to initialize the reader.
-     *
-     * @return {@code true} if a record was read, {@code false} if there is no more input available.
-     */
-    public abstract boolean start() throws IOException;
-
-    /**
-     * Advances the reader to the next valid record.
-     *
-     * <p>It is an error to call this without having called {@link #start} first.
-     *
-     * @return {@code true} if a record was read, {@code false} if there is no more input available.
-     */
-    public abstract boolean advance() throws IOException;
-
-    /**
-     * Returns the value of the data item that was read by the last {@link #start} or
-     * {@link #advance} call. The returned value must be effectively immutable and remain valid
-     * indefinitely.
-     *
-     * <p>Multiple calls to this method without an intervening call to {@link #advance} should
-     * return the same result.
-     *
-     * @throws java.util.NoSuchElementException if {@link #start} was never called, or if
-     *         the last {@link #start} or {@link #advance} returned {@code false}.
-     */
-    public abstract T getCurrent() throws NoSuchElementException;
-
-    /**
-     * Returns the timestamp associated with the current data item.
-     *
-     * <p>If the source does not support timestamps, this should return
-     * {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
-     *
-     * <p>Multiple calls to this method without an intervening call to {@link #advance} should
-     * return the same result.
-     *
-     * @throws NoSuchElementException if the reader is at the beginning of the input and
-     *         {@link #start} or {@link #advance} wasn't called, or if the last {@link #start} or
-     *         {@link #advance} returned {@code false}.
-     */
-    public abstract Instant getCurrentTimestamp() throws NoSuchElementException;
-
-    /**
-     * Closes the reader. The reader cannot be used after this method is called.
-     */
-    @Override
-    public abstract void close() throws IOException;
-
-    /**
-     * Returns a {@code Source} describing the same input that this {@code Reader} currently reads
-     * (including items already read).
-     *
-     * <p>Usually, an implementation will simply return the immutable {@link Source} object from
-     * which the current {@link Reader} was constructed, or delegate to the base class.
-     * However, when using or implementing this method on a {@link BoundedSource.BoundedReader},
-     * special considerations apply, see documentation for
-     * {@link BoundedSource.BoundedReader#getCurrentSource}.
-     */
-    public abstract Source<T> getCurrentSource();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
deleted file mode 100644
index d342f25..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
+++ /dev/null
@@ -1,992 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Coder.Context;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.io.Read.Bounded;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.MimeTypes;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.NoSuchElementException;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-/**
- * {@link PTransform}s for reading and writing text files.
- *
- * <p>To read a {@link PCollection} from one or more text files, use {@link TextIO.Read}.
- * You can instantiate a transform using {@link TextIO.Read#from(String)} to specify
- * the path of the file(s) to read from (e.g., a local filename or
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). You may optionally call
- * {@link TextIO.Read#named(String)} to specify the name of the pipeline step.
- *
- * <p>By default, {@link TextIO.Read} returns a {@link PCollection} of {@link String Strings},
- * each corresponding to one line of an input UTF-8 text file. To convert directly from the raw
- * bytes (split into lines delimited by '\n', '\r', or '\r\n') to another object of type {@code T},
- * supply a {@code Coder<T>} using {@link TextIO.Read#withCoder(Coder)}.
- *
- * <p>See the following examples:
- *
- * <pre>{@code
- * Pipeline p = ...;
- *
- * // A simple Read of a local file (only runs locally):
- * PCollection<String> lines =
- *     p.apply(TextIO.Read.from("/local/path/to/file.txt"));
- *
- * // A fully-specified Read from a GCS file (runs locally and via the
- * // Google Cloud Dataflow service):
- * PCollection<Integer> numbers =
- *     p.apply(TextIO.Read.named("ReadNumbers")
- *                        .from("gs://my_bucket/path/to/numbers-*.txt")
- *                        .withCoder(TextualIntegerCoder.of()));
- * }</pre>
- *
- * <p>To write a {@link PCollection} to one or more text files, use
- * {@link TextIO.Write}, specifying {@link TextIO.Write#to(String)} to specify
- * the path of the file to write to (e.g., a local filename or sharded
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). You can optionally name the resulting transform using
- * {@link TextIO.Write#named(String)}, and you can use {@link TextIO.Write#withCoder(Coder)}
- * to specify the Coder to use to encode the Java values into text lines.
- *
- * <p>Any existing files with the same names as generated output files
- * will be overwritten.
- *
- * <p>For example:
- * <pre>{@code
- * // A simple Write to a local file (only runs locally):
- * PCollection<String> lines = ...;
- * lines.apply(TextIO.Write.to("/path/to/file.txt"));
- *
- * // A fully-specified Write to a sharded GCS file (runs locally and via the
- * // Google Cloud Dataflow service):
- * PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- *                           .to("gs://my_bucket/path/to/numbers")
- *                           .withSuffix(".txt")
- *                           .withCoder(TextualIntegerCoder.of()));
- * }</pre>
- *
- * <h3>Permissions</h3>
- * <p>When run using the {@link DirectPipelineRunner}, your pipeline can read and write text files
- * on your local drive and remote text files on Google Cloud Storage that you have access to using
- * your {@code gcloud} credentials. When running in the Dataflow service using
- * {@link DataflowPipelineRunner}, the pipeline can only read and write files from GCS. For more
- * information about permissions, see the Cloud Dataflow documentation on
- * <a href="https://cloud.google.com/dataflow/security-and-permissions">Security and
- * Permissions</a>.
- */
-public class TextIO {
-  /** The default coder, which returns each line of the input file as a string. */
-  public static final Coder<String> DEFAULT_TEXT_CODER = StringUtf8Coder.of();
-
-  /**
-   * A {@link PTransform} that reads from a text file (or multiple text
-   * files matching a pattern) and returns a {@link PCollection} containing
-   * the decoding of each of the lines of the text file(s). The
-   * default decoding just returns each line as a {@link String}, but you may call
-   * {@link #withCoder(Coder)} to change the return type.
-   */
-  public static class Read {
-    /**
-     * Returns a transform for reading text files that uses the given step name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_TEXT_CODER).named(name);
-    }
-
-    /**
-     * Returns a transform for reading text files that reads from the file(s)
-     * with the given filename or filename pattern. This can be a local path (if running locally),
-     * or a Google Cloud Storage filename or filename pattern of the form
-     * {@code "gs://<bucket>/<filepath>"} (if running locally or via the Google Cloud Dataflow
-     * service). Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html"
-     * >Java Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
-     */
-    public static Bound<String> from(String filepattern) {
-      return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern);
-    }
-
-    /**
-     * Returns a transform for reading text files that uses the given
-     * {@code Coder<T>} to decode each of the lines of the file into a
-     * value of type {@code T}.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which just
-     * returns the text lines as Java strings.
-     *
-     * @param <T> the type of the decoded elements, and the elements
-     * of the resulting PCollection
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
-    }
-
-    /**
-     * Returns a transform for reading text files that has GCS path validation on
-     * pipeline creation disabled.
-     *
-     * <p>This can be useful in the case where the GCS input does not
-     * exist at the pipeline creation time, but is expected to be
-     * available at execution time.
-     */
-    public static Bound<String> withoutValidation() {
-      return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation();
-    }
-
-    /**
-     * Returns a transform for reading text files that decompresses all input files
-     * using the specified compression type.
-     *
-     * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
-     * In this mode, the compression type of the file is determined by its extension
-     * (e.g., {@code *.gz} is gzipped, {@code *.bz2} is bzipped, and all other extensions are
-     * uncompressed).
-     */
-    public static Bound<String> withCompressionType(TextIO.CompressionType compressionType) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withCompressionType(compressionType);
-    }
-
-    // TODO: strippingNewlines, etc.
-
-    /**
-     * A {@link PTransform} that reads from one or more text files and returns a bounded
-     * {@link PCollection} containing one element for each line of the input files.
-     *
-     * @param <T> the type of each of the elements of the resulting
-     * {@link PCollection}. By default, each line is returned as a {@link String}, however you
-     * may use {@link #withCoder(Coder)} to supply a {@code Coder<T>} to produce a
-     * {@code PCollection<T>} instead.
-     */
-    public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
-      /** The filepattern to read from. */
-      @Nullable private final String filepattern;
-
-      /** The Coder to use to decode each line. */
-      private final Coder<T> coder;
-
-      /** An option to indicate if input validation is desired. Default is true. */
-      private final boolean validate;
-
-      /** Option to indicate the input source's compression type. Default is AUTO. */
-      private final TextIO.CompressionType compressionType;
-
-      Bound(Coder<T> coder) {
-        this(null, null, coder, true, TextIO.CompressionType.AUTO);
-      }
-
-      private Bound(String name, String filepattern, Coder<T> coder, boolean validate,
-          TextIO.CompressionType compressionType) {
-        super(name);
-        this.coder = coder;
-        this.filepattern = filepattern;
-        this.validate = validate;
-        this.compressionType = compressionType;
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(name, filepattern, coder, validate, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * that reads from the file(s) with the given name or pattern. See {@link TextIO.Read#from}
-       * for a description of filepatterns.
-       *
-       * <p>Does not modify this object.
-
-       */
-      public Bound<T> from(String filepattern) {
-        return new Bound<>(name, filepattern, coder, validate, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * that uses the given {@link Coder Coder<X>} to decode each of the
-       * lines of the file into a value of type {@code X}.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the decoded elements, and the
-       * elements of the resulting PCollection
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, filepattern, coder, validate, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * that has GCS path validation on pipeline creation disabled.
-       *
-       * <p>This can be useful in the case where the GCS input does not
-       * exist at the pipeline creation time, but is expected to be
-       * available at execution time.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withoutValidation() {
-        return new Bound<>(name, filepattern, coder, false, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * reads from input sources using the specified compression type.
-       *
-       * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
-       * See {@link TextIO.Read#withCompressionType} for more details.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withCompressionType(TextIO.CompressionType compressionType) {
-        return new Bound<>(name, filepattern, coder, validate, compressionType);
-      }
-
-      @Override
-      public PCollection<T> apply(PInput input) {
-        if (filepattern == null) {
-          throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
-        }
-
-        if (validate) {
-          try {
-            checkState(
-                !IOChannelUtils.getFactory(filepattern).match(filepattern).isEmpty(),
-                "Unable to find any files matching %s",
-                filepattern);
-          } catch (IOException e) {
-            throw new IllegalStateException(
-                String.format("Failed to validate %s", filepattern), e);
-          }
-        }
-
-        // Create a source specific to the requested compression type.
-        final Bounded<T> read;
-        switch(compressionType) {
-          case UNCOMPRESSED:
-            read = com.google.cloud.dataflow.sdk.io.Read.from(
-                new TextSource<T>(filepattern, coder));
-            break;
-          case AUTO:
-            read = com.google.cloud.dataflow.sdk.io.Read.from(
-                CompressedSource.from(new TextSource<T>(filepattern, coder)));
-            break;
-          case BZIP2:
-            read = com.google.cloud.dataflow.sdk.io.Read.from(
-                CompressedSource.from(new TextSource<T>(filepattern, coder))
-                                .withDecompression(CompressedSource.CompressionMode.BZIP2));
-            break;
-          case GZIP:
-            read = com.google.cloud.dataflow.sdk.io.Read.from(
-                CompressedSource.from(new TextSource<T>(filepattern, coder))
-                                .withDecompression(CompressedSource.CompressionMode.GZIP));
-            break;
-          default:
-            throw new IllegalArgumentException("Unknown compression mode: " + compressionType);
-        }
-
-        PCollection<T> pcol = input.getPipeline().apply("Read", read);
-        // Honor the default output coder that would have been used by this PTransform.
-        pcol.setCoder(getDefaultOutputCoder());
-        return pcol;
-      }
-
-      @Override
-      protected Coder<T> getDefaultOutputCoder() {
-        return coder;
-      }
-
-      public String getFilepattern() {
-        return filepattern;
-      }
-
-      public boolean needsValidation() {
-        return validate;
-      }
-
-      public TextIO.CompressionType getCompressionType() {
-        return compressionType;
-      }
-    }
-
-    /** Disallow construction of utility classes. */
-    private Read() {}
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A {@link PTransform} that writes a {@link PCollection} to text file (or
-   * multiple text files matching a sharding pattern), with each
-   * element of the input collection encoded into its own line.
-   */
-  public static class Write {
-    /**
-     * Returns a transform for writing to text files with the given step name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_TEXT_CODER).named(name);
-    }
-
-    /**
-     * Returns a transform for writing to text files that writes to the file(s)
-     * with the given prefix. This can be a local filename
-     * (if running locally), or a Google Cloud Storage filename of
-     * the form {@code "gs://<bucket>/<filepath>"}
-     * (if running locally or via the Google Cloud Dataflow service).
-     *
-     * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link Bound#withNumShards(int)}, and end
-     * in a common extension, if given by {@link Bound#withSuffix(String)}.
-     */
-    public static Bound<String> to(String prefix) {
-      return new Bound<>(DEFAULT_TEXT_CODER).to(prefix);
-    }
-
-    /**
-     * Returns a transform for writing to text files that appends the specified suffix
-     * to the created files.
-     */
-    public static Bound<String> withSuffix(String nameExtension) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withSuffix(nameExtension);
-    }
-
-    /**
-     * Returns a transform for writing to text files that uses the provided shard count.
-     *
-     * <p>Constraining the number of shards is likely to reduce
-     * the performance of a pipeline. Setting this value is not recommended
-     * unless you require a specific number of output files.
-     *
-     * @param numShards the number of shards to use, or 0 to let the system
-     *                  decide.
-     */
-    public static Bound<String> withNumShards(int numShards) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withNumShards(numShards);
-    }
-
-    /**
-     * Returns a transform for writing to text files that uses the given shard name
-     * template.
-     *
-     * <p>See {@link ShardNameTemplate} for a description of shard templates.
-     */
-    public static Bound<String> withShardNameTemplate(String shardTemplate) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withShardNameTemplate(shardTemplate);
-    }
-
-    /**
-     * Returns a transform for writing to text files that forces a single file as
-     * output.
-     */
-    public static Bound<String> withoutSharding() {
-      return new Bound<>(DEFAULT_TEXT_CODER).withoutSharding();
-    }
-
-    /**
-     * Returns a transform for writing to text files that uses the given
-     * {@link Coder} to encode each of the elements of the input
-     * {@link PCollection} into an output text line.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which writes input
-     * Java strings directly as output lines.
-     *
-     * @param <T> the type of the elements of the input {@link PCollection}
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
-    }
-
-    /**
-     * Returns a transform for writing to text files that has GCS path validation on
-     * pipeline creation disabled.
-     *
-     * <p>This can be useful in the case where the GCS output location does
-     * not exist at the pipeline creation time, but is expected to be available
-     * at execution time.
-     */
-    public static Bound<String> withoutValidation() {
-      return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation();
-    }
-
-    // TODO: appendingNewlines, header, footer, etc.
-
-    /**
-     * A PTransform that writes a bounded PCollection to a text file (or
-     * multiple text files matching a sharding pattern), with each
-     * PCollection element being encoded into its own line.
-     *
-     * @param <T> the type of the elements of the input PCollection
-     */
-    public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-      /** The prefix of each file written, combined with suffix and shardTemplate. */
-      @Nullable private final String filenamePrefix;
-      /** The suffix of each file written, combined with prefix and shardTemplate. */
-      private final String filenameSuffix;
-
-      /** The Coder to use to decode each line. */
-      private final Coder<T> coder;
-
-      /** Requested number of shards. 0 for automatic. */
-      private final int numShards;
-
-      /** The shard template of each file written, combined with prefix and suffix. */
-      private final String shardTemplate;
-
-      /** An option to indicate if output validation is desired. Default is true. */
-      private final boolean validate;
-
-      Bound(Coder<T> coder) {
-        this(null, null, "", coder, 0, ShardNameTemplate.INDEX_OF_MAX, true);
-      }
-
-      private Bound(String name, String filenamePrefix, String filenameSuffix, Coder<T> coder,
-          int numShards, String shardTemplate, boolean validate) {
-        super(name);
-        this.coder = coder;
-        this.filenamePrefix = filenamePrefix;
-        this.filenameSuffix = filenameSuffix;
-        this.numShards = numShards;
-        this.shardTemplate = shardTemplate;
-        this.validate = validate;
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that writes to the file(s) with the given filename prefix.
-       *
-       * <p>See {@link TextIO.Write#to(String) Write.to(String)} for more information.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> to(String filenamePrefix) {
-        validateOutputComponent(filenamePrefix);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that that's like this one but
-       * that writes to the file(s) with the given filename suffix.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound<T> withSuffix(String nameExtension) {
-        validateOutputComponent(nameExtension);
-        return new Bound<>(name, filenamePrefix, nameExtension, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that uses the provided shard count.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Setting this value is not recommended
-       * unless you require a specific number of output files.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param numShards the number of shards to use, or 0 to let the system
-       *                  decide.
-       * @see ShardNameTemplate
-       */
-      public Bound<T> withNumShards(int numShards) {
-        Preconditions.checkArgument(numShards >= 0);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that uses the given shard name template.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound<T> withShardNameTemplate(String shardTemplate) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that forces a single file as output.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Using this setting is not recommended
-       * unless you truly require a single output file.
-       *
-       * <p>This is a shortcut for
-       * {@code .withNumShards(1).withShardNameTemplate("")}
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withoutSharding() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, "", validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one
-       * but that uses the given {@link Coder Coder<X>} to encode each of
-       * the elements of the input {@link PCollection PCollection<X>} into an
-       * output text line. Does not modify this object.
-       *
-       * @param <X> the type of the elements of the input {@link PCollection}
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that has GCS output path validation on pipeline creation disabled.
-       *
-       * <p>This can be useful in the case where the GCS output location does
-       * not exist at the pipeline creation time, but is expected to be
-       * available at execution time.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withoutValidation() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, false);
-      }
-
-      @Override
-      public PDone apply(PCollection<T> input) {
-        if (filenamePrefix == null) {
-          throw new IllegalStateException(
-              "need to set the filename prefix of a TextIO.Write transform");
-        }
-
-        // Note that custom sinks currently do not expose sharding controls.
-        // Thus pipeline runner writers need to individually add support internally to
-        // apply user requested sharding limits.
-        return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to(
-            new TextSink<>(
-                filenamePrefix, filenameSuffix, shardTemplate, coder)));
-      }
-
-      /**
-       * Returns the current shard name template string.
-       */
-      public String getShardNameTemplate() {
-        return shardTemplate;
-      }
-
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
-
-      public String getFilenamePrefix() {
-        return filenamePrefix;
-      }
-
-      public String getShardTemplate() {
-        return shardTemplate;
-      }
-
-      public int getNumShards() {
-        return numShards;
-      }
-
-      public String getFilenameSuffix() {
-        return filenameSuffix;
-      }
-
-      public Coder<T> getCoder() {
-        return coder;
-      }
-
-      public boolean needsValidation() {
-        return validate;
-      }
-    }
-  }
-
-  /**
-   * Possible text file compression types.
-   */
-  public static enum CompressionType {
-    /**
-     * Automatically determine the compression type based on filename extension.
-     */
-    AUTO(""),
-    /**
-     * Uncompressed (i.e., may be split).
-     */
-    UNCOMPRESSED(""),
-    /**
-     * GZipped.
-     */
-    GZIP(".gz"),
-    /**
-     * BZipped.
-     */
-    BZIP2(".bz2");
-
-    private String filenameSuffix;
-
-    private CompressionType(String suffix) {
-      this.filenameSuffix = suffix;
-    }
-
-    /**
-     * Determine if a given filename matches a compression type based on its extension.
-     * @param filename the filename to match
-     * @return true iff the filename ends with the compression type's known extension.
-     */
-    public boolean matches(String filename) {
-      return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase());
-    }
-  }
-
-  // Pattern which matches old-style shard output patterns, which are now
-  // disallowed.
-  private static final Pattern SHARD_OUTPUT_PATTERN = Pattern.compile("@([0-9]+|\\*)");
-
-  private static void validateOutputComponent(String partialFilePattern) {
-    Preconditions.checkArgument(
-        !SHARD_OUTPUT_PATTERN.matcher(partialFilePattern).find(),
-        "Output name components are not allowed to contain @* or @N patterns: "
-        + partialFilePattern);
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /** Disable construction of utility class. */
-  private TextIO() {}
-
-  /**
-   * A {@link FileBasedSource} which can decode records delimited by new line characters.
-   *
-   * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or
-   * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record
-   * even if it is not delimited. Finally, no records are decoded if the stream is empty.
-   *
-   * <p>This source supports reading from any arbitrary byte position within the stream. If the
-   * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
-   * representing the beginning of the first record to be decoded.
-   */
-  @VisibleForTesting
-  static class TextSource<T> extends FileBasedSource<T> {
-    /** The Coder to use to decode each line. */
-    private final Coder<T> coder;
-
-    @VisibleForTesting
-    TextSource(String fileSpec, Coder<T> coder) {
-      super(fileSpec, 1L);
-      this.coder = coder;
-    }
-
-    private TextSource(String fileName, long start, long end, Coder<T> coder) {
-      super(fileName, 1L, start, end);
-      this.coder = coder;
-    }
-
-    @Override
-    protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
-      return new TextSource<>(fileName, start, end, coder);
-    }
-
-    @Override
-    protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
-      return new TextBasedReader<>(this);
-    }
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return false;
-    }
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return coder;
-    }
-
-    /**
-     * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSource.FileBasedReader FileBasedReader}
-     * which can decode records delimited by new line characters.
-     *
-     * See {@link TextSource} for further details.
-     */
-    @VisibleForTesting
-    static class TextBasedReader<T> extends FileBasedReader<T> {
-      private static final int READ_BUFFER_SIZE = 8192;
-      private final Coder<T> coder;
-      private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
-      private ByteString buffer;
-      private int startOfSeparatorInBuffer;
-      private int endOfSeparatorInBuffer;
-      private long startOfNextRecord;
-      private boolean eof;
-      private boolean elementIsPresent;
-      private T currentValue;
-      private ReadableByteChannel inChannel;
-
-      private TextBasedReader(TextSource<T> source) {
-        super(source);
-        coder = source.coder;
-        buffer = ByteString.EMPTY;
-      }
-
-      @Override
-      protected long getCurrentOffset() throws NoSuchElementException {
-        if (!elementIsPresent) {
-          throw new NoSuchElementException();
-        }
-        return startOfNextRecord;
-      }
-
-      @Override
-      public T getCurrent() throws NoSuchElementException {
-        if (!elementIsPresent) {
-          throw new NoSuchElementException();
-        }
-        return currentValue;
-      }
-
-      @Override
-      protected void startReading(ReadableByteChannel channel) throws IOException {
-        this.inChannel = channel;
-        // If the first offset is greater than zero, we need to skip bytes until we see our
-        // first separator.
-        if (getCurrentSource().getStartOffset() > 0) {
-          checkState(channel instanceof SeekableByteChannel,
-              "%s only supports reading from a SeekableByteChannel when given a start offset"
-              + " greater than 0.", TextSource.class.getSimpleName());
-          long requiredPosition = getCurrentSource().getStartOffset() - 1;
-          ((SeekableByteChannel) channel).position(requiredPosition);
-          findSeparatorBounds();
-          buffer = buffer.substring(endOfSeparatorInBuffer);
-          startOfNextRecord = requiredPosition + endOfSeparatorInBuffer;
-          endOfSeparatorInBuffer = 0;
-          startOfSeparatorInBuffer = 0;
-        }
-      }
-
-      /**
-       * Locates the start position and end position of the next delimiter. Will
-       * consume the channel till either EOF or the delimiter bounds are found.
-       *
-       * <p>This fills the buffer and updates the positions as follows:
-       * <pre>{@code
-       * ------------------------------------------------------
-       * | element bytes | delimiter bytes | unconsumed bytes |
-       * ------------------------------------------------------
-       * 0            start of          end of              buffer
-       *              separator         separator           size
-       *              in buffer         in buffer
-       * }</pre>
-       */
-      private void findSeparatorBounds() throws IOException {
-        int bytePositionInBuffer = 0;
-        while (true) {
-          if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
-            startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer;
-            break;
-          }
-
-          byte currentByte = buffer.byteAt(bytePositionInBuffer);
-
-          if (currentByte == '\n') {
-            startOfSeparatorInBuffer = bytePositionInBuffer;
-            endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-            break;
-          } else if (currentByte == '\r') {
-            startOfSeparatorInBuffer = bytePositionInBuffer;
-            endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-
-            if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
-              currentByte = buffer.byteAt(bytePositionInBuffer + 1);
-              if (currentByte == '\n') {
-                endOfSeparatorInBuffer += 1;
-              }
-            }
-            break;
-          }
-
-          // Move to the next byte in buffer.
-          bytePositionInBuffer += 1;
-        }
-      }
-
-      @Override
-      protected boolean readNextRecord() throws IOException {
-        startOfNextRecord += endOfSeparatorInBuffer;
-        findSeparatorBounds();
-
-        // If we have reached EOF file and consumed all of the buffer then we know
-        // that there are no more records.
-        if (eof && buffer.size() == 0) {
-          elementIsPresent = false;
-          return false;
-        }
-
-        decodeCurrentElement();
-        return true;
-      }
-
-      /**
-       * Decodes the current element updating the buffer to only contain the unconsumed bytes.
-       *
-       * This invalidates the currently stored {@code startOfSeparatorInBuffer} and
-       * {@code endOfSeparatorInBuffer}.
-       */
-      private void decodeCurrentElement() throws IOException {
-        ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer);
-        currentValue = coder.decode(dataToDecode.newInput(), Context.OUTER);
-        elementIsPresent = true;
-        buffer = buffer.substring(endOfSeparatorInBuffer);
-      }
-
-      /**
-       * Returns false if we were unable to ensure the minimum capacity by consuming the channel.
-       */
-      private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException {
-        // While we aren't at EOF or haven't fulfilled the minimum buffer capacity,
-        // attempt to read more bytes.
-        while (buffer.size() <= minCapacity && !eof) {
-          eof = inChannel.read(readBuffer) == -1;
-          readBuffer.flip();
-          buffer = buffer.concat(ByteString.copyFrom(readBuffer));
-          readBuffer.clear();
-        }
-        // Return true if we were able to honor the minimum buffer capacity request
-        return buffer.size() >= minCapacity;
-      }
-    }
-  }
-
-  /**
-   * A {@link FileBasedSink} for text files. Produces text files with the new line separator
-   * {@code '\n'} represented in {@code UTF-8} format as the record separator.
-   * Each record (including the last) is terminated.
-   */
-  @VisibleForTesting
-  static class TextSink<T> extends FileBasedSink<T> {
-    private final Coder<T> coder;
-
-    @VisibleForTesting
-    TextSink(
-        String baseOutputFilename, String extension, String fileNameTemplate, Coder<T> coder) {
-      super(baseOutputFilename, extension, fileNameTemplate);
-      this.coder = coder;
-    }
-
-    @Override
-    public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
-      return new TextWriteOperation<>(this, coder);
-    }
-
-    /**
-     * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation
-     * FileBasedWriteOperation} for text files.
-     */
-    private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
-      private final Coder<T> coder;
-
-      private TextWriteOperation(TextSink<T> sink, Coder<T> coder) {
-        super(sink);
-        this.coder = coder;
-      }
-
-      @Override
-      public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
-        return new TextWriter<>(this, coder);
-      }
-    }
-
-    /**
-     * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter}
-     * for text files.
-     */
-    private static class TextWriter<T> extends FileBasedWriter<T> {
-      private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
-      private final Coder<T> coder;
-      private OutputStream out;
-
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
-        super(writeOperation);
-        this.mimeType = MimeTypes.TEXT;
-        this.coder = coder;
-      }
-
-      @Override
-      protected void prepareWrite(WritableByteChannel channel) throws Exception {
-        out = Channels.newOutputStream(channel);
-      }
-
-      @Override
-      public void write(T value) throws Exception {
-        coder.encode(value, out, Context.OUTER);
-        out.write(NEWLINE);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java
deleted file mode 100644
index e585151..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Source} that reads an unbounded amount of input and, because of that, supports
- * some additional operations such as checkpointing, watermarks, and record ids.
- *
- * <ul>
- * <li> Checkpointing allows sources to not re-read the same data again in the case of failures.
- * <li> Watermarks allow for downstream parts of the pipeline to know up to what point
- *   in time the data is complete.
- * <li> Record ids allow for efficient deduplication of input records; many streaming sources
- *   do not guarantee that a given record will only be read a single time.
- * </ul>
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window} and
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger} for more information on
- * timestamps and watermarks.
- *
- * @param <OutputT> Type of records output by this source.
- * @param <CheckpointMarkT> Type of checkpoint marks used by the readers of this source.
- */
-public abstract class UnboundedSource<
-        OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends Source<OutputT> {
-  /**
-   * Returns a list of {@code UnboundedSource} objects representing the instances of this source
-   * that should be used when executing the workflow.  Each split should return a separate partition
-   * of the input data.
-   *
-   * <p>For example, for a source reading from a growing directory of files, each split
-   * could correspond to a prefix of file names.
-   *
-   * <p>Some sources are not splittable, such as reading from a single TCP stream.  In that
-   * case, only a single split should be returned.
-   *
-   * <p>Some data sources automatically partition their data among readers.  For these types of
-   * inputs, {@code n} identical replicas of the top-level source can be returned.
-   *
-   * <p>The size of the returned list should be as close to {@code desiredNumSplits}
-   * as possible, but does not have to match exactly.  A low number of splits
-   * will limit the amount of parallelism in the source.
-   */
-  public abstract List<? extends UnboundedSource<OutputT, CheckpointMarkT>> generateInitialSplits(
-      int desiredNumSplits, PipelineOptions options) throws Exception;
-
-  /**
-   * Create a new {@link UnboundedReader} to read from this source, resuming from the given
-   * checkpoint if present.
-   */
-  public abstract UnboundedReader<OutputT> createReader(
-      PipelineOptions options, @Nullable CheckpointMarkT checkpointMark);
-
-  /**
-   * Returns a {@link Coder} for encoding and decoding the checkpoints for this source, or
-   * null if the checkpoints do not need to be durably committed.
-   */
-  @Nullable
-  public abstract Coder<CheckpointMarkT> getCheckpointMarkCoder();
-
-  /**
-   * Returns whether this source requires explicit deduping.
-   *
-   * <p>This is needed if the underlying data source can return the same record multiple times,
-   * such a queuing system with a pull-ack model.  Sources where the records read are uniquely
-   * identified by the persisted state in the CheckpointMark do not need this.
-   */
-  public boolean requiresDeduping() {
-    return false;
-  }
-
-  /**
-   * A marker representing the progress and state of an
-   * {@link com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader}.
-   *
-   * <p>For example, this could be offsets in a set of files being read.
-   */
-  public interface CheckpointMark {
-    /**
-     * Perform any finalization that needs to happen after a bundle of data read from
-     * the source has been processed and committed.
-     *
-     * <p>For example, this could be sending acknowledgement requests to an external
-     * data source such as Pub/Sub.
-     *
-     * <p>This may be called from any thread, potentially at the same time as calls to the
-     * {@code UnboundedReader} that created it.
-     */
-    void finalizeCheckpoint() throws IOException;
-  }
-
-  /**
-   * A {@code Reader} that reads an unbounded amount of input.
-   *
-   * <p>A given {@code UnboundedReader} object will only be accessed by a single thread at once.
-   */
-  @Experimental(Experimental.Kind.SOURCE_SINK)
-  public abstract static class UnboundedReader<OutputT> extends Source.Reader<OutputT> {
-    private static final byte[] EMPTY = new byte[0];
-
-    /**
-     * Initializes the reader and advances the reader to the first record.
-     *
-     * <p>This method should be called exactly once. The invocation should occur prior to calling
-     * {@link #advance} or {@link #getCurrent}. This method may perform expensive operations that
-     * are needed to initialize the reader.
-     *
-     * <p>Returns {@code true} if a record was read, {@code false} if there is no more input
-     * currently available.  Future calls to {@link #advance} may return {@code true} once more data
-     * is available. Regardless of the return value of {@code start}, {@code start} will not be
-     * called again on the same {@code UnboundedReader} object; it will only be called again when a
-     * new reader object is constructed for the same source, e.g. on recovery.
-     */
-    @Override
-    public abstract boolean start() throws IOException;
-
-    /**
-     * Advances the reader to the next valid record.
-     *
-     * <p>Returns {@code true} if a record was read, {@code false} if there is no more input
-     * available. Future calls to {@link #advance} may return {@code true} once more data is
-     * available.
-     */
-    @Override
-    public abstract boolean advance() throws IOException;
-
-    /**
-     * Returns a unique identifier for the current record.  This should be the same for each
-     * instance of the same logical record read from the underlying data source.
-     *
-     * <p>It is only necessary to override this if {@link #requiresDeduping} has been overridden to
-     * return true.
-     *
-     * <p>For example, this could be a hash of the record contents, or a logical ID present in
-     * the record.  If this is generated as a hash of the record contents, it should be at least 16
-     * bytes (128 bits) to avoid collisions.
-     *
-     * <p>This method has the same restrictions on when it can be called as {@link #getCurrent} and
-     * {@link #getCurrentTimestamp}.
-     *
-     * @throws NoSuchElementException if the reader is at the beginning of the input and
-     *         {@link #start} or {@link #advance} wasn't called, or if the last {@link #start} or
-     *         {@link #advance} returned {@code false}.
-     */
-    public byte[] getCurrentRecordId() throws NoSuchElementException {
-      if (getCurrentSource().requiresDeduping()) {
-        throw new IllegalStateException(
-            "getCurrentRecordId() must be overridden if requiresDeduping returns true()");
-      }
-      return EMPTY;
-    }
-
-    /**
-     * Returns a timestamp before or at the timestamps of all future elements read by this reader.
-     *
-     * <p>This can be approximate.  If records are read that violate this guarantee, they will be
-     * considered late, which will affect how they will be processed.  See
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window} for more information on
-     * late data and how to handle it.
-     *
-     * <p>However, this value should be as late as possible. Downstream windows may not be able
-     * to close until this watermark passes their end.
-     *
-     * <p>For example, a source may know that the records it reads will be in timestamp order.  In
-     * this case, the watermark can be the timestamp of the last record read.  For a
-     * source that does not have natural timestamps, timestamps can be set to the time of
-     * reading, in which case the watermark is the current clock time.
-     *
-     * <p>See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window} and
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger} for more
-     * information on timestamps and watermarks.
-     *
-     * <p>May be called after {@link #advance} or {@link #start} has returned false, but not before
-     * {@link #start} has been called.
-     */
-    public abstract Instant getWatermark();
-
-    /**
-     * Returns a {@link CheckpointMark} representing the progress of this {@code UnboundedReader}.
-     *
-     * <p>The elements read up until this is called will be processed together as a bundle. Once
-     * the result of this processing has been durably committed,
-     * {@link CheckpointMark#finalizeCheckpoint} will be called on the {@link CheckpointMark}
-     * object.
-     *
-     * <p>The returned object should not be modified.
-     *
-     * <p>May be called after {@link #advance} or {@link #start} has returned false, but not before
-     * {@link #start} has been called.
-     */
-    public abstract CheckpointMark getCheckpointMark();
-
-    /**
-     * Constant representing an unknown amount of backlog.
-     */
-    public static final long BACKLOG_UNKNOWN = -1L;
-
-    /**
-     * Returns the size of the backlog of unread data in the underlying data source represented by
-     * this split of this source.
-     *
-     * <p>One of this or {@link #getTotalBacklogBytes} should be overridden in order to allow the
-     * runner to scale the amount of resources allocated to the pipeline.
-     */
-    public long getSplitBacklogBytes() {
-      return BACKLOG_UNKNOWN;
-    }
-
-    /**
-     * Returns the size of the backlog of unread data in the underlying data source represented by
-     * all splits of this source.
-     *
-     * <p>One of this or {@link #getSplitBacklogBytes} should be overridden in order to allow the
-     * runner to scale the amount of resources allocated to the pipeline.
-     */
-    public long getTotalBacklogBytes() {
-      return BACKLOG_UNKNOWN;
-    }
-
-    /**
-     * Returns the {@link UnboundedSource} that created this reader.  This will not change over the
-     * life of the reader.
-     */
-    @Override
-    public abstract UnboundedSource<OutputT, ?> getCurrentSource();
-  }
-}


Mime
View raw message