beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [13/74] [partial] incubator-beam git commit: Rename com/google/cloud/dataflow->org/apache/beam
Date Thu, 14 Apr 2016 04:48:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
deleted file mode 100644
index ed25926..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
+++ /dev/null
@@ -1,991 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.Coder.Context;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.io.Read.Bounded;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.MimeTypes;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.NoSuchElementException;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-/**
- * {@link PTransform}s for reading and writing text files.
- *
- * <p>To read a {@link PCollection} from one or more text files, use {@link TextIO.Read}.
- * You can instantiate a transform using {@link TextIO.Read#from(String)} to specify
- * the path of the file(s) to read from (e.g., a local filename or
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). You may optionally call
- * {@link TextIO.Read#named(String)} to specify the name of the pipeline step.
- *
- * <p>By default, {@link TextIO.Read} returns a {@link PCollection} of {@link String Strings},
- * each corresponding to one line of an input UTF-8 text file. To convert directly from the raw
- * bytes (split into lines delimited by '\n', '\r', or '\r\n') to another object of type {@code T},
- * supply a {@code Coder<T>} using {@link TextIO.Read#withCoder(Coder)}.
- *
- * <p>See the following examples:
- *
- * <pre>{@code
- * Pipeline p = ...;
- *
- * // A simple Read of a local file (only runs locally):
- * PCollection<String> lines =
- *     p.apply(TextIO.Read.from("/local/path/to/file.txt"));
- *
- * // A fully-specified Read from a GCS file (runs locally and via the
- * // Google Cloud Dataflow service):
- * PCollection<Integer> numbers =
- *     p.apply(TextIO.Read.named("ReadNumbers")
- *                        .from("gs://my_bucket/path/to/numbers-*.txt")
- *                        .withCoder(TextualIntegerCoder.of()));
- * }</pre>
- *
- * <p>To write a {@link PCollection} to one or more text files, use
- * {@link TextIO.Write}, specifying {@link TextIO.Write#to(String)} to specify
- * the path of the file to write to (e.g., a local filename or sharded
- * filename pattern if running locally, or a Google Cloud Storage
- * filename or sharded filename pattern of the form
- * {@code "gs://<bucket>/<filepath>"}). You can optionally name the resulting transform using
- * {@link TextIO.Write#named(String)}, and you can use {@link TextIO.Write#withCoder(Coder)}
- * to specify the Coder to use to encode the Java values into text lines.
- *
- * <p>Any existing files with the same names as generated output files
- * will be overwritten.
- *
- * <p>For example:
- * <pre>{@code
- * // A simple Write to a local file (only runs locally):
- * PCollection<String> lines = ...;
- * lines.apply(TextIO.Write.to("/path/to/file.txt"));
- *
- * // A fully-specified Write to a sharded GCS file (runs locally and via the
- * // Google Cloud Dataflow service):
- * PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- *                           .to("gs://my_bucket/path/to/numbers")
- *                           .withSuffix(".txt")
- *                           .withCoder(TextualIntegerCoder.of()));
- * }</pre>
- *
- * <h3>Permissions</h3>
- * <p>When run using the {@link DirectPipelineRunner}, your pipeline can read and write text files
- * on your local drive and remote text files on Google Cloud Storage that you have access to using
- * your {@code gcloud} credentials. When running in the Dataflow service, the pipeline can only
- * read and write files from GCS. For more information about permissions, see the Cloud Dataflow
- * documentation on <a href="https://cloud.google.com/dataflow/security-and-permissions">Security
- * and Permissions</a>.
- */
-public class TextIO {
-  /** The default coder, which returns each line of the input file as a string. */
-  public static final Coder<String> DEFAULT_TEXT_CODER = StringUtf8Coder.of();
-
-  /**
-   * A {@link PTransform} that reads from a text file (or multiple text
-   * files matching a pattern) and returns a {@link PCollection} containing
-   * the decoding of each of the lines of the text file(s). The
-   * default decoding just returns each line as a {@link String}, but you may call
-   * {@link #withCoder(Coder)} to change the return type.
-   */
-  public static class Read {
-    /**
-     * Returns a transform for reading text files that uses the given step name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_TEXT_CODER).named(name);
-    }
-
-    /**
-     * Returns a transform for reading text files that reads from the file(s)
-     * with the given filename or filename pattern. This can be a local path (if running locally),
-     * or a Google Cloud Storage filename or filename pattern of the form
-     * {@code "gs://<bucket>/<filepath>"} (if running locally or via the Google Cloud Dataflow
-     * service). Standard <a href="http://docs.oracle.com/javase/tutorial/essential/io/find.html"
-     * >Java Filesystem glob patterns</a> ("*", "?", "[..]") are supported.
-     */
-    public static Bound<String> from(String filepattern) {
-      return new Bound<>(DEFAULT_TEXT_CODER).from(filepattern);
-    }
-
-    /**
-     * Returns a transform for reading text files that uses the given
-     * {@code Coder<T>} to decode each of the lines of the file into a
-     * value of type {@code T}.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which just
-     * returns the text lines as Java strings.
-     *
-     * @param <T> the type of the decoded elements, and the elements
-     * of the resulting PCollection
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
-    }
-
-    /**
-     * Returns a transform for reading text files that has GCS path validation on
-     * pipeline creation disabled.
-     *
-     * <p>This can be useful in the case where the GCS input does not
-     * exist at the pipeline creation time, but is expected to be
-     * available at execution time.
-     */
-    public static Bound<String> withoutValidation() {
-      return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation();
-    }
-
-    /**
-     * Returns a transform for reading text files that decompresses all input files
-     * using the specified compression type.
-     *
-     * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
-     * In this mode, the compression type of the file is determined by its extension
-     * (e.g., {@code *.gz} is gzipped, {@code *.bz2} is bzipped, and all other extensions are
-     * uncompressed).
-     */
-    public static Bound<String> withCompressionType(TextIO.CompressionType compressionType) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withCompressionType(compressionType);
-    }
-
-    // TODO: strippingNewlines, etc.
-
-    /**
-     * A {@link PTransform} that reads from one or more text files and returns a bounded
-     * {@link PCollection} containing one element for each line of the input files.
-     *
-     * @param <T> the type of each of the elements of the resulting
-     * {@link PCollection}. By default, each line is returned as a {@link String}, however you
-     * may use {@link #withCoder(Coder)} to supply a {@code Coder<T>} to produce a
-     * {@code PCollection<T>} instead.
-     */
-    public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
-      /** The filepattern to read from. */
-      @Nullable private final String filepattern;
-
-      /** The Coder to use to decode each line. */
-      private final Coder<T> coder;
-
-      /** An option to indicate if input validation is desired. Default is true. */
-      private final boolean validate;
-
-      /** Option to indicate the input source's compression type. Default is AUTO. */
-      private final TextIO.CompressionType compressionType;
-
-      Bound(Coder<T> coder) {
-        this(null, null, coder, true, TextIO.CompressionType.AUTO);
-      }
-
-      private Bound(String name, String filepattern, Coder<T> coder, boolean validate,
-          TextIO.CompressionType compressionType) {
-        super(name);
-        this.coder = coder;
-        this.filepattern = filepattern;
-        this.validate = validate;
-        this.compressionType = compressionType;
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(name, filepattern, coder, validate, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * that reads from the file(s) with the given name or pattern. See {@link TextIO.Read#from}
-       * for a description of filepatterns.
-       *
-       * <p>Does not modify this object.
-
-       */
-      public Bound<T> from(String filepattern) {
-        return new Bound<>(name, filepattern, coder, validate, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * that uses the given {@link Coder Coder<X>} to decode each of the
-       * lines of the file into a value of type {@code X}.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the decoded elements, and the
-       * elements of the resulting PCollection
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, filepattern, coder, validate, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * that has GCS path validation on pipeline creation disabled.
-       *
-       * <p>This can be useful in the case where the GCS input does not
-       * exist at the pipeline creation time, but is expected to be
-       * available at execution time.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withoutValidation() {
-        return new Bound<>(name, filepattern, coder, false, compressionType);
-      }
-
-      /**
-       * Returns a new transform for reading from text files that's like this one but
-       * reads from input sources using the specified compression type.
-       *
-       * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
-       * See {@link TextIO.Read#withCompressionType} for more details.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withCompressionType(TextIO.CompressionType compressionType) {
-        return new Bound<>(name, filepattern, coder, validate, compressionType);
-      }
-
-      @Override
-      public PCollection<T> apply(PInput input) {
-        if (filepattern == null) {
-          throw new IllegalStateException("need to set the filepattern of a TextIO.Read transform");
-        }
-
-        if (validate) {
-          try {
-            checkState(
-                !IOChannelUtils.getFactory(filepattern).match(filepattern).isEmpty(),
-                "Unable to find any files matching %s",
-                filepattern);
-          } catch (IOException e) {
-            throw new IllegalStateException(
-                String.format("Failed to validate %s", filepattern), e);
-          }
-        }
-
-        // Create a source specific to the requested compression type.
-        final Bounded<T> read;
-        switch(compressionType) {
-          case UNCOMPRESSED:
-            read = com.google.cloud.dataflow.sdk.io.Read.from(
-                new TextSource<T>(filepattern, coder));
-            break;
-          case AUTO:
-            read = com.google.cloud.dataflow.sdk.io.Read.from(
-                CompressedSource.from(new TextSource<T>(filepattern, coder)));
-            break;
-          case BZIP2:
-            read = com.google.cloud.dataflow.sdk.io.Read.from(
-                CompressedSource.from(new TextSource<T>(filepattern, coder))
-                                .withDecompression(CompressedSource.CompressionMode.BZIP2));
-            break;
-          case GZIP:
-            read = com.google.cloud.dataflow.sdk.io.Read.from(
-                CompressedSource.from(new TextSource<T>(filepattern, coder))
-                                .withDecompression(CompressedSource.CompressionMode.GZIP));
-            break;
-          default:
-            throw new IllegalArgumentException("Unknown compression mode: " + compressionType);
-        }
-
-        PCollection<T> pcol = input.getPipeline().apply("Read", read);
-        // Honor the default output coder that would have been used by this PTransform.
-        pcol.setCoder(getDefaultOutputCoder());
-        return pcol;
-      }
-
-      @Override
-      protected Coder<T> getDefaultOutputCoder() {
-        return coder;
-      }
-
-      public String getFilepattern() {
-        return filepattern;
-      }
-
-      public boolean needsValidation() {
-        return validate;
-      }
-
-      public TextIO.CompressionType getCompressionType() {
-        return compressionType;
-      }
-    }
-
-    /** Disallow construction of utility classes. */
-    private Read() {}
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A {@link PTransform} that writes a {@link PCollection} to text file (or
-   * multiple text files matching a sharding pattern), with each
-   * element of the input collection encoded into its own line.
-   */
-  public static class Write {
-    /**
-     * Returns a transform for writing to text files with the given step name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_TEXT_CODER).named(name);
-    }
-
-    /**
-     * Returns a transform for writing to text files that writes to the file(s)
-     * with the given prefix. This can be a local filename
-     * (if running locally), or a Google Cloud Storage filename of
-     * the form {@code "gs://<bucket>/<filepath>"}
-     * (if running locally or via the Google Cloud Dataflow service).
-     *
-     * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link Bound#withNumShards(int)}, and end
-     * in a common extension, if given by {@link Bound#withSuffix(String)}.
-     */
-    public static Bound<String> to(String prefix) {
-      return new Bound<>(DEFAULT_TEXT_CODER).to(prefix);
-    }
-
-    /**
-     * Returns a transform for writing to text files that appends the specified suffix
-     * to the created files.
-     */
-    public static Bound<String> withSuffix(String nameExtension) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withSuffix(nameExtension);
-    }
-
-    /**
-     * Returns a transform for writing to text files that uses the provided shard count.
-     *
-     * <p>Constraining the number of shards is likely to reduce
-     * the performance of a pipeline. Setting this value is not recommended
-     * unless you require a specific number of output files.
-     *
-     * @param numShards the number of shards to use, or 0 to let the system
-     *                  decide.
-     */
-    public static Bound<String> withNumShards(int numShards) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withNumShards(numShards);
-    }
-
-    /**
-     * Returns a transform for writing to text files that uses the given shard name
-     * template.
-     *
-     * <p>See {@link ShardNameTemplate} for a description of shard templates.
-     */
-    public static Bound<String> withShardNameTemplate(String shardTemplate) {
-      return new Bound<>(DEFAULT_TEXT_CODER).withShardNameTemplate(shardTemplate);
-    }
-
-    /**
-     * Returns a transform for writing to text files that forces a single file as
-     * output.
-     */
-    public static Bound<String> withoutSharding() {
-      return new Bound<>(DEFAULT_TEXT_CODER).withoutSharding();
-    }
-
-    /**
-     * Returns a transform for writing to text files that uses the given
-     * {@link Coder} to encode each of the elements of the input
-     * {@link PCollection} into an output text line.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which writes input
-     * Java strings directly as output lines.
-     *
-     * @param <T> the type of the elements of the input {@link PCollection}
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
-    }
-
-    /**
-     * Returns a transform for writing to text files that has GCS path validation on
-     * pipeline creation disabled.
-     *
-     * <p>This can be useful in the case where the GCS output location does
-     * not exist at the pipeline creation time, but is expected to be available
-     * at execution time.
-     */
-    public static Bound<String> withoutValidation() {
-      return new Bound<>(DEFAULT_TEXT_CODER).withoutValidation();
-    }
-
-    // TODO: appendingNewlines, header, footer, etc.
-
-    /**
-     * A PTransform that writes a bounded PCollection to a text file (or
-     * multiple text files matching a sharding pattern), with each
-     * PCollection element being encoded into its own line.
-     *
-     * @param <T> the type of the elements of the input PCollection
-     */
-    public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-      /** The prefix of each file written, combined with suffix and shardTemplate. */
-      @Nullable private final String filenamePrefix;
-      /** The suffix of each file written, combined with prefix and shardTemplate. */
-      private final String filenameSuffix;
-
-      /** The Coder to use to decode each line. */
-      private final Coder<T> coder;
-
-      /** Requested number of shards. 0 for automatic. */
-      private final int numShards;
-
-      /** The shard template of each file written, combined with prefix and suffix. */
-      private final String shardTemplate;
-
-      /** An option to indicate if output validation is desired. Default is true. */
-      private final boolean validate;
-
-      Bound(Coder<T> coder) {
-        this(null, null, "", coder, 0, ShardNameTemplate.INDEX_OF_MAX, true);
-      }
-
-      private Bound(String name, String filenamePrefix, String filenameSuffix, Coder<T> coder,
-          int numShards, String shardTemplate, boolean validate) {
-        super(name);
-        this.coder = coder;
-        this.filenamePrefix = filenamePrefix;
-        this.filenameSuffix = filenameSuffix;
-        this.numShards = numShards;
-        this.shardTemplate = shardTemplate;
-        this.validate = validate;
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that writes to the file(s) with the given filename prefix.
-       *
-       * <p>See {@link TextIO.Write#to(String) Write.to(String)} for more information.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> to(String filenamePrefix) {
-        validateOutputComponent(filenamePrefix);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that that's like this one but
-       * that writes to the file(s) with the given filename suffix.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound<T> withSuffix(String nameExtension) {
-        validateOutputComponent(nameExtension);
-        return new Bound<>(name, filenamePrefix, nameExtension, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that uses the provided shard count.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Setting this value is not recommended
-       * unless you require a specific number of output files.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param numShards the number of shards to use, or 0 to let the system
-       *                  decide.
-       * @see ShardNameTemplate
-       */
-      public Bound<T> withNumShards(int numShards) {
-        Preconditions.checkArgument(numShards >= 0);
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that uses the given shard name template.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound<T> withShardNameTemplate(String shardTemplate) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that forces a single file as output.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Using this setting is not recommended
-       * unless you truly require a single output file.
-       *
-       * <p>This is a shortcut for
-       * {@code .withNumShards(1).withShardNameTemplate("")}
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withoutSharding() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, 1, "", validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one
-       * but that uses the given {@link Coder Coder<X>} to encode each of
-       * the elements of the input {@link PCollection PCollection<X>} into an
-       * output text line. Does not modify this object.
-       *
-       * @param <X> the type of the elements of the input {@link PCollection}
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, validate);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that has GCS output path validation on pipeline creation disabled.
-       *
-       * <p>This can be useful in the case where the GCS output location does
-       * not exist at the pipeline creation time, but is expected to be
-       * available at execution time.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> withoutValidation() {
-        return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards,
-            shardTemplate, false);
-      }
-
-      @Override
-      public PDone apply(PCollection<T> input) {
-        if (filenamePrefix == null) {
-          throw new IllegalStateException(
-              "need to set the filename prefix of a TextIO.Write transform");
-        }
-
-        // Note that custom sinks currently do not expose sharding controls.
-        // Thus pipeline runner writers need to individually add support internally to
-        // apply user requested sharding limits.
-        return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to(
-            new TextSink<>(
-                filenamePrefix, filenameSuffix, shardTemplate, coder)));
-      }
-
-      /**
-       * Returns the current shard name template string.
-       */
-      public String getShardNameTemplate() {
-        return shardTemplate;
-      }
-
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
-
-      public String getFilenamePrefix() {
-        return filenamePrefix;
-      }
-
-      public String getShardTemplate() {
-        return shardTemplate;
-      }
-
-      public int getNumShards() {
-        return numShards;
-      }
-
-      public String getFilenameSuffix() {
-        return filenameSuffix;
-      }
-
-      public Coder<T> getCoder() {
-        return coder;
-      }
-
-      public boolean needsValidation() {
-        return validate;
-      }
-    }
-  }
-
-  /**
-   * Possible text file compression types.
-   */
-  public static enum CompressionType {
-    /**
-     * Automatically determine the compression type based on filename extension.
-     */
-    AUTO(""),
-    /**
-     * Uncompressed (i.e., may be split).
-     */
-    UNCOMPRESSED(""),
-    /**
-     * GZipped.
-     */
-    GZIP(".gz"),
-    /**
-     * BZipped.
-     */
-    BZIP2(".bz2");
-
-    private String filenameSuffix;
-
-    private CompressionType(String suffix) {
-      this.filenameSuffix = suffix;
-    }
-
-    /**
-     * Determine if a given filename matches a compression type based on its extension.
-     * @param filename the filename to match
-     * @return true iff the filename ends with the compression type's known extension.
-     */
-    public boolean matches(String filename) {
-      return filename.toLowerCase().endsWith(filenameSuffix.toLowerCase());
-    }
-  }
-
-  // Pattern which matches old-style shard output patterns, which are now
-  // disallowed.
-  private static final Pattern SHARD_OUTPUT_PATTERN = Pattern.compile("@([0-9]+|\\*)");
-
-  private static void validateOutputComponent(String partialFilePattern) {
-    Preconditions.checkArgument(
-        !SHARD_OUTPUT_PATTERN.matcher(partialFilePattern).find(),
-        "Output name components are not allowed to contain @* or @N patterns: "
-        + partialFilePattern);
-  }
-
-  //////////////////////////////////////////////////////////////////////////////
-
-  /** Disable construction of utility class. */
-  private TextIO() {}
-
-  /**
-   * A {@link FileBasedSource} which can decode records delimited by new line characters.
-   *
-   * <p>This source splits the data into records using {@code UTF-8} {@code \n}, {@code \r}, or
-   * {@code \r\n} as the delimiter. This source is not strict and supports decoding the last record
-   * even if it is not delimited. Finally, no records are decoded if the stream is empty.
-   *
-   * <p>This source supports reading from any arbitrary byte position within the stream. If the
-   * starting position is not {@code 0}, then bytes are skipped until the first delimiter is found
-   * representing the beginning of the first record to be decoded.
-   */
-  @VisibleForTesting
-  static class TextSource<T> extends FileBasedSource<T> {
-    /** The Coder to use to decode each line. */
-    private final Coder<T> coder;
-
-    @VisibleForTesting
-    TextSource(String fileSpec, Coder<T> coder) {
-      super(fileSpec, 1L);
-      this.coder = coder;
-    }
-
-    private TextSource(String fileName, long start, long end, Coder<T> coder) {
-      super(fileName, 1L, start, end);
-      this.coder = coder;
-    }
-
-    @Override
-    protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) {
-      return new TextSource<>(fileName, start, end, coder);
-    }
-
-    @Override
-    protected FileBasedReader<T> createSingleFileReader(PipelineOptions options) {
-      return new TextBasedReader<>(this);
-    }
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return false;
-    }
-
-    @Override
-    public Coder<T> getDefaultOutputCoder() {
-      return coder;
-    }
-
-    /**
-     * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSource.FileBasedReader FileBasedReader}
-     * which can decode records delimited by new line characters.
-     *
-     * See {@link TextSource} for further details.
-     */
-    @VisibleForTesting
-    static class TextBasedReader<T> extends FileBasedReader<T> {
-      private static final int READ_BUFFER_SIZE = 8192;
-      private final Coder<T> coder;
-      private final ByteBuffer readBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE);
-      private ByteString buffer;
-      private int startOfSeparatorInBuffer;
-      private int endOfSeparatorInBuffer;
-      private long startOfNextRecord;
-      private boolean eof;
-      private boolean elementIsPresent;
-      private T currentValue;
-      private ReadableByteChannel inChannel;
-
-      private TextBasedReader(TextSource<T> source) {
-        super(source);
-        coder = source.coder;
-        buffer = ByteString.EMPTY;
-      }
-
-      @Override
-      protected long getCurrentOffset() throws NoSuchElementException {
-        if (!elementIsPresent) {
-          throw new NoSuchElementException();
-        }
-        return startOfNextRecord;
-      }
-
-      @Override
-      public T getCurrent() throws NoSuchElementException {
-        if (!elementIsPresent) {
-          throw new NoSuchElementException();
-        }
-        return currentValue;
-      }
-
-      @Override
-      protected void startReading(ReadableByteChannel channel) throws IOException {
-        this.inChannel = channel;
-        // If the first offset is greater than zero, we need to skip bytes until we see our
-        // first separator.
-        if (getCurrentSource().getStartOffset() > 0) {
-          checkState(channel instanceof SeekableByteChannel,
-              "%s only supports reading from a SeekableByteChannel when given a start offset"
-              + " greater than 0.", TextSource.class.getSimpleName());
-          long requiredPosition = getCurrentSource().getStartOffset() - 1;
-          ((SeekableByteChannel) channel).position(requiredPosition);
-          findSeparatorBounds();
-          buffer = buffer.substring(endOfSeparatorInBuffer);
-          startOfNextRecord = requiredPosition + endOfSeparatorInBuffer;
-          endOfSeparatorInBuffer = 0;
-          startOfSeparatorInBuffer = 0;
-        }
-      }
-
-      /**
-       * Locates the start position and end position of the next delimiter. Will
-       * consume the channel till either EOF or the delimiter bounds are found.
-       *
-       * <p>This fills the buffer and updates the positions as follows:
-       * <pre>{@code
-       * ------------------------------------------------------
-       * | element bytes | delimiter bytes | unconsumed bytes |
-       * ------------------------------------------------------
-       * 0            start of          end of              buffer
-       *              separator         separator           size
-       *              in buffer         in buffer
-       * }</pre>
-       */
-      private void findSeparatorBounds() throws IOException {
-        int bytePositionInBuffer = 0;
-        while (true) {
-          if (!tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 1)) {
-            startOfSeparatorInBuffer = endOfSeparatorInBuffer = bytePositionInBuffer;
-            break;
-          }
-
-          byte currentByte = buffer.byteAt(bytePositionInBuffer);
-
-          if (currentByte == '\n') {
-            startOfSeparatorInBuffer = bytePositionInBuffer;
-            endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-            break;
-          } else if (currentByte == '\r') {
-            startOfSeparatorInBuffer = bytePositionInBuffer;
-            endOfSeparatorInBuffer = startOfSeparatorInBuffer + 1;
-
-            if (tryToEnsureNumberOfBytesInBuffer(bytePositionInBuffer + 2)) {
-              currentByte = buffer.byteAt(bytePositionInBuffer + 1);
-              if (currentByte == '\n') {
-                endOfSeparatorInBuffer += 1;
-              }
-            }
-            break;
-          }
-
-          // Move to the next byte in buffer.
-          bytePositionInBuffer += 1;
-        }
-      }
-
-      @Override
-      protected boolean readNextRecord() throws IOException {
-        startOfNextRecord += endOfSeparatorInBuffer;
-        findSeparatorBounds();
-
-        // If we have reached EOF file and consumed all of the buffer then we know
-        // that there are no more records.
-        if (eof && buffer.size() == 0) {
-          elementIsPresent = false;
-          return false;
-        }
-
-        decodeCurrentElement();
-        return true;
-      }
-
-      /**
-       * Decodes the current element updating the buffer to only contain the unconsumed bytes.
-       *
-       * This invalidates the currently stored {@code startOfSeparatorInBuffer} and
-       * {@code endOfSeparatorInBuffer}.
-       */
-      private void decodeCurrentElement() throws IOException {
-        ByteString dataToDecode = buffer.substring(0, startOfSeparatorInBuffer);
-        currentValue = coder.decode(dataToDecode.newInput(), Context.OUTER);
-        elementIsPresent = true;
-        buffer = buffer.substring(endOfSeparatorInBuffer);
-      }
-
-      /**
-       * Returns false if we were unable to ensure the minimum capacity by consuming the channel.
-       */
-      private boolean tryToEnsureNumberOfBytesInBuffer(int minCapacity) throws IOException {
-        // While we aren't at EOF or haven't fulfilled the minimum buffer capacity,
-        // attempt to read more bytes.
-        while (buffer.size() <= minCapacity && !eof) {
-          eof = inChannel.read(readBuffer) == -1;
-          readBuffer.flip();
-          buffer = buffer.concat(ByteString.copyFrom(readBuffer));
-          readBuffer.clear();
-        }
-        // Return true if we were able to honor the minimum buffer capacity request
-        return buffer.size() >= minCapacity;
-      }
-    }
-  }
-
-  /**
-   * A {@link FileBasedSink} for text files. Produces text files with the new line separator
-   * {@code '\n'} represented in {@code UTF-8} format as the record separator.
-   * Each record (including the last) is terminated.
-   */
-  @VisibleForTesting
-  static class TextSink<T> extends FileBasedSink<T> {
-    private final Coder<T> coder;
-
-    @VisibleForTesting
-    TextSink(
-        String baseOutputFilename, String extension, String fileNameTemplate, Coder<T> coder) {
-      super(baseOutputFilename, extension, fileNameTemplate);
-      this.coder = coder;
-    }
-
-    @Override
-    public FileBasedSink.FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options) {
-      return new TextWriteOperation<>(this, coder);
-    }
-
-    /**
-     * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation
-     * FileBasedWriteOperation} for text files.
-     */
-    private static class TextWriteOperation<T> extends FileBasedWriteOperation<T> {
-      private final Coder<T> coder;
-
-      private TextWriteOperation(TextSink<T> sink, Coder<T> coder) {
-        super(sink);
-        this.coder = coder;
-      }
-
-      @Override
-      public FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception {
-        return new TextWriter<>(this, coder);
-      }
-    }
-
-    /**
-     * A {@link com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter}
-     * for text files.
-     */
-    private static class TextWriter<T> extends FileBasedWriter<T> {
-      private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
-      private final Coder<T> coder;
-      private OutputStream out;
-
-      public TextWriter(FileBasedWriteOperation<T> writeOperation, Coder<T> coder) {
-        super(writeOperation);
-        this.mimeType = MimeTypes.TEXT;
-        this.coder = coder;
-      }
-
-      @Override
-      protected void prepareWrite(WritableByteChannel channel) throws Exception {
-        out = Channels.newOutputStream(channel);
-      }
-
-      @Override
-      public void write(T value) throws Exception {
-        coder.encode(value, out, Context.OUTER);
-        out.write(NEWLINE);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java
deleted file mode 100644
index 9f4a234..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/UnboundedSource.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-
-import org.joda.time.Instant;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * A {@link Source} that reads an unbounded amount of input and, because of that, supports
- * some additional operations such as checkpointing, watermarks, and record ids.
- *
- * <ul>
- * <li> Checkpointing allows sources to not re-read the same data again in the case of failures.
- * <li> Watermarks allow for downstream parts of the pipeline to know up to what point
- *   in time the data is complete.
- * <li> Record ids allow for efficient deduplication of input records; many streaming sources
- *   do not guarantee that a given record will only be read a single time.
- * </ul>
- *
- * <p>See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window} and
- * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger} for more information on
- * timestamps and watermarks.
- *
- * @param <OutputT> Type of records output by this source.
- * @param <CheckpointMarkT> Type of checkpoint marks used by the readers of this source.
- */
-public abstract class UnboundedSource<
-        OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends Source<OutputT> {
-  /**
-   * Returns a list of {@code UnboundedSource} objects representing the instances of this source
-   * that should be used when executing the workflow.  Each split should return a separate partition
-   * of the input data.
-   *
-   * <p>For example, for a source reading from a growing directory of files, each split
-   * could correspond to a prefix of file names.
-   *
-   * <p>Some sources are not splittable, such as reading from a single TCP stream.  In that
-   * case, only a single split should be returned.
-   *
-   * <p>Some data sources automatically partition their data among readers.  For these types of
-   * inputs, {@code n} identical replicas of the top-level source can be returned.
-   *
-   * <p>The size of the returned list should be as close to {@code desiredNumSplits}
-   * as possible, but does not have to match exactly.  A low number of splits
-   * will limit the amount of parallelism in the source.
-   */
-  public abstract List<? extends UnboundedSource<OutputT, CheckpointMarkT>> generateInitialSplits(
-      int desiredNumSplits, PipelineOptions options) throws Exception;
-
-  /**
-   * Create a new {@link UnboundedReader} to read from this source, resuming from the given
-   * checkpoint if present.
-   */
-  public abstract UnboundedReader<OutputT> createReader(
-      PipelineOptions options, @Nullable CheckpointMarkT checkpointMark);
-
-  /**
-   * Returns a {@link Coder} for encoding and decoding the checkpoints for this source, or
-   * null if the checkpoints do not need to be durably committed.
-   */
-  @Nullable
-  public abstract Coder<CheckpointMarkT> getCheckpointMarkCoder();
-
-  /**
-   * Returns whether this source requires explicit deduping.
-   *
-   * <p>This is needed if the underlying data source can return the same record multiple times,
-   * such a queuing system with a pull-ack model.  Sources where the records read are uniquely
-   * identified by the persisted state in the CheckpointMark do not need this.
-   */
-  public boolean requiresDeduping() {
-    return false;
-  }
-
-  /**
-   * A marker representing the progress and state of an
-   * {@link com.google.cloud.dataflow.sdk.io.UnboundedSource.UnboundedReader}.
-   *
-   * <p>For example, this could be offsets in a set of files being read.
-   */
-  public interface CheckpointMark {
-    /**
-     * Perform any finalization that needs to happen after a bundle of data read from
-     * the source has been processed and committed.
-     *
-     * <p>For example, this could be sending acknowledgement requests to an external
-     * data source such as Pub/Sub.
-     *
-     * <p>This may be called from any thread, potentially at the same time as calls to the
-     * {@code UnboundedReader} that created it.
-     */
-    void finalizeCheckpoint() throws IOException;
-  }
-
-  /**
-   * A {@code Reader} that reads an unbounded amount of input.
-   *
-   * <p>A given {@code UnboundedReader} object will only be accessed by a single thread at once.
-   */
-  @Experimental(Experimental.Kind.SOURCE_SINK)
-  public abstract static class UnboundedReader<OutputT> extends Source.Reader<OutputT> {
-    private static final byte[] EMPTY = new byte[0];
-
-    /**
-     * Initializes the reader and advances the reader to the first record.
-     *
-     * <p>This method should be called exactly once. The invocation should occur prior to calling
-     * {@link #advance} or {@link #getCurrent}. This method may perform expensive operations that
-     * are needed to initialize the reader.
-     *
-     * <p>Returns {@code true} if a record was read, {@code false} if there is no more input
-     * currently available.  Future calls to {@link #advance} may return {@code true} once more data
-     * is available. Regardless of the return value of {@code start}, {@code start} will not be
-     * called again on the same {@code UnboundedReader} object; it will only be called again when a
-     * new reader object is constructed for the same source, e.g. on recovery.
-     */
-    @Override
-    public abstract boolean start() throws IOException;
-
-    /**
-     * Advances the reader to the next valid record.
-     *
-     * <p>Returns {@code true} if a record was read, {@code false} if there is no more input
-     * available. Future calls to {@link #advance} may return {@code true} once more data is
-     * available.
-     */
-    @Override
-    public abstract boolean advance() throws IOException;
-
-    /**
-     * Returns a unique identifier for the current record.  This should be the same for each
-     * instance of the same logical record read from the underlying data source.
-     *
-     * <p>It is only necessary to override this if {@link #requiresDeduping} has been overridden to
-     * return true.
-     *
-     * <p>For example, this could be a hash of the record contents, or a logical ID present in
-     * the record.  If this is generated as a hash of the record contents, it should be at least 16
-     * bytes (128 bits) to avoid collisions.
-     *
-     * <p>This method has the same restrictions on when it can be called as {@link #getCurrent} and
-     * {@link #getCurrentTimestamp}.
-     *
-     * @throws NoSuchElementException if the reader is at the beginning of the input and
-     *         {@link #start} or {@link #advance} wasn't called, or if the last {@link #start} or
-     *         {@link #advance} returned {@code false}.
-     */
-    public byte[] getCurrentRecordId() throws NoSuchElementException {
-      if (getCurrentSource().requiresDeduping()) {
-        throw new IllegalStateException(
-            "getCurrentRecordId() must be overridden if requiresDeduping returns true()");
-      }
-      return EMPTY;
-    }
-
-    /**
-     * Returns a timestamp before or at the timestamps of all future elements read by this reader.
-     *
-     * <p>This can be approximate.  If records are read that violate this guarantee, they will be
-     * considered late, which will affect how they will be processed.  See
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window} for more information on
-     * late data and how to handle it.
-     *
-     * <p>However, this value should be as late as possible. Downstream windows may not be able
-     * to close until this watermark passes their end.
-     *
-     * <p>For example, a source may know that the records it reads will be in timestamp order.  In
-     * this case, the watermark can be the timestamp of the last record read.  For a
-     * source that does not have natural timestamps, timestamps can be set to the time of
-     * reading, in which case the watermark is the current clock time.
-     *
-     * <p>See {@link com.google.cloud.dataflow.sdk.transforms.windowing.Window} and
-     * {@link com.google.cloud.dataflow.sdk.transforms.windowing.Trigger} for more
-     * information on timestamps and watermarks.
-     *
-     * <p>May be called after {@link #advance} or {@link #start} has returned false, but not before
-     * {@link #start} has been called.
-     */
-    public abstract Instant getWatermark();
-
-    /**
-     * Returns a {@link CheckpointMark} representing the progress of this {@code UnboundedReader}.
-     *
-     * <p>The elements read up until this is called will be processed together as a bundle. Once
-     * the result of this processing has been durably committed,
-     * {@link CheckpointMark#finalizeCheckpoint} will be called on the {@link CheckpointMark}
-     * object.
-     *
-     * <p>The returned object should not be modified.
-     *
-     * <p>May be called after {@link #advance} or {@link #start} has returned false, but not before
-     * {@link #start} has been called.
-     */
-    public abstract CheckpointMark getCheckpointMark();
-
-    /**
-     * Constant representing an unknown amount of backlog.
-     */
-    public static final long BACKLOG_UNKNOWN = -1L;
-
-    /**
-     * Returns the size of the backlog of unread data in the underlying data source represented by
-     * this split of this source.
-     *
-     * <p>One of this or {@link #getTotalBacklogBytes} should be overridden in order to allow the
-     * runner to scale the amount of resources allocated to the pipeline.
-     */
-    public long getSplitBacklogBytes() {
-      return BACKLOG_UNKNOWN;
-    }
-
-    /**
-     * Returns the size of the backlog of unread data in the underlying data source represented by
-     * all splits of this source.
-     *
-     * <p>One of this or {@link #getSplitBacklogBytes} should be overridden in order to allow the
-     * runner to scale the amount of resources allocated to the pipeline.
-     */
-    public long getTotalBacklogBytes() {
-      return BACKLOG_UNKNOWN;
-    }
-
-    /**
-     * Returns the {@link UnboundedSource} that created this reader.  This will not change over the
-     * life of the reader.
-     */
-    @Override
-    public abstract UnboundedSource<OutputT, ?> getCurrentSource();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
deleted file mode 100644
index a70105f..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import org.apache.beam.sdk.annotations.Experimental;
-
-import com.google.cloud.dataflow.sdk.Pipeline;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
-import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation;
-import com.google.cloud.dataflow.sdk.io.Sink.Writer;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.View;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollectionView;
-import com.google.cloud.dataflow.sdk.values.PDone;
-
-import org.joda.time.Instant;
-
-import java.util.UUID;
-
-/**
- * A {@link PTransform} that writes to a {@link Sink}. A write begins with a sequential global
- * initialization of a sink, followed by a parallel write, and ends with a sequential finalization
- * of the write. The output of a write is {@link PDone}.  In the case of an empty PCollection, only
- * the global initialization and finalization will be performed.
- *
- * <p>Currently, only batch workflows can contain Write transforms.
- *
- * <p>Example usage:
- *
- * <p>{@code p.apply(Write.to(new MySink(...)));}
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class Write {
-  /**
-   * Creates a Write transform that writes to the given Sink.
-   */
-  public static <T> Bound<T> to(Sink<T> sink) {
-    return new Bound<>(sink);
-  }
-
-  /**
-   * A {@link PTransform} that writes to a {@link Sink}. See {@link Write} and {@link Sink} for
-   * documentation about writing to Sinks.
-   */
-  public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-    private final Sink<T> sink;
-
-    private Bound(Sink<T> sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      PipelineOptions options = input.getPipeline().getOptions();
-      sink.validate(options);
-      return createWrite(input, sink.createWriteOperation(options));
-    }
-
-    /**
-     * Returns the {@link Sink} associated with this PTransform.
-     */
-    public Sink<T> getSink() {
-      return sink;
-    }
-
-    /**
-     * A write is performed as sequence of three {@link ParDo}'s.
-     *
-     * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
-     * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
-     * called. The output of this ParDo is a singleton PCollection
-     * containing the WriteOperation.
-     *
-     * <p>This singleton collection containing the WriteOperation is then used as a side input to a
-     * ParDo over the PCollection of elements to write. In this bundle-writing phase,
-     * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-     * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
-     * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for every
-     * element in the bundle. The output of this ParDo is a PCollection of <i>writer result</i>
-     * objects (see {@link Sink} for a description of writer results)-one for each bundle.
-     *
-     * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
-     * the collection of writer results as a side-input. In this ParDo,
-     * {@link WriteOperation#finalize} is called to finalize the write.
-     *
-     * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
-     * before the exception that caused the write to fail is propagated and the write result will be
-     * discarded.
-     *
-     * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
-     * deserialized in the bundle-writing and finalization phases, any state change to the
-     * WriteOperation object that occurs during initialization is visible in the latter phases.
-     * However, the WriteOperation is not serialized after the bundle-writing phase.  This is why
-     * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
-     * WriteOperation).
-     */
-    private <WriteT> PDone createWrite(
-        PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
-      Pipeline p = input.getPipeline();
-
-      // A coder to use for the WriteOperation.
-      @SuppressWarnings("unchecked")
-      Coder<WriteOperation<T, WriteT>> operationCoder =
-          (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
-
-      // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
-      // the sink.
-      PCollection<WriteOperation<T, WriteT>> operationCollection =
-          p.apply(Create.<WriteOperation<T, WriteT>>of(writeOperation).withCoder(operationCoder));
-
-      // Initialize the resource in a do-once ParDo on the WriteOperation.
-      operationCollection = operationCollection
-          .apply("Initialize", ParDo.of(
-              new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
-            @Override
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              writeOperation.initialize(c.getPipelineOptions());
-              // The WriteOperation is also the output of this ParDo, so it can have mutable
-              // state.
-              c.output(writeOperation);
-            }
-          }))
-          .setCoder(operationCoder);
-
-      // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
-      final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
-          operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
-
-      // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
-      // as a side input) and collect the results of the writes in a PCollection.
-      // There is a dependency between this ParDo and the first (the WriteOperation PCollection
-      // as a side input), so this will happen after the initial ParDo.
-      PCollection<WriteT> results = input
-          .apply("WriteBundles", ParDo.of(new DoFn<T, WriteT>() {
-            // Writer that will write the records in this bundle. Lazily
-            // initialized in processElement.
-            private Writer<T, WriteT> writer = null;
-
-            @Override
-            public void processElement(ProcessContext c) throws Exception {
-              // Lazily initialize the Writer
-              if (writer == null) {
-                WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-                writer = writeOperation.createWriter(c.getPipelineOptions());
-                writer.open(UUID.randomUUID().toString());
-              }
-              try {
-                writer.write(c.element());
-              } catch (Exception e) {
-                // Discard write result and close the write.
-                try {
-                  writer.close();
-                } catch (Exception closeException) {
-                  // Do not mask the exception that caused the write to fail.
-                }
-                throw e;
-              }
-            }
-
-            @Override
-            public void finishBundle(Context c) throws Exception {
-              if (writer != null) {
-                WriteT result = writer.close();
-                // Output the result of the write.
-                c.outputWithTimestamp(result, Instant.now());
-              }
-            }
-          }).withSideInputs(writeOperationView))
-          .setCoder(writeOperation.getWriterResultCoder())
-          .apply(Window.<WriteT>into(new GlobalWindows()));
-
-      final PCollectionView<Iterable<WriteT>> resultsView =
-          results.apply(View.<WriteT>asIterable());
-
-      // Finalize the write in another do-once ParDo on the singleton collection containing the
-      // Writer. The results from the per-bundle writes are given as an Iterable side input.
-      // The WriteOperation's state is the same as after its initialization in the first do-once
-      // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
-      // collection as a side input), so it will happen after the parallel write.
-      @SuppressWarnings("unused")
-      final PCollection<Integer> done = operationCollection
-          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
-            @Override
-            public void processElement(ProcessContext c) throws Exception {
-              Iterable<WriteT> results = c.sideInput(resultsView);
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              writeOperation.finalize(results, c.getPipelineOptions());
-            }
-          }).withSideInputs(resultsView));
-      return PDone.in(input.getPipeline());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
deleted file mode 100644
index 9ba76c5..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation;
-import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.Preconditions;
-
-import java.io.OutputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-
-// CHECKSTYLE.OFF: JavadocStyle
-/**
- * A {@link Sink} that outputs records as XML-formatted elements. Writes a {@link PCollection} of
- * records from JAXB-annotated classes to a single file location.
- *
- * <p>Given a PCollection containing records of type T that can be marshalled to XML elements, this
- * Sink will produce a single file consisting of a single root element that contains all of the
- * elements in the PCollection.
- *
- * <p>XML Sinks are created with a base filename to write to, a root element name that will be used
- * for the root element of the output files, and a class to bind to an XML element. This class
- * will be used in the marshalling of records in an input PCollection to their XML representation
- * and must be able to be bound using JAXB annotations (checked at pipeline construction time).
- *
- * <p>XML Sinks can be written to using the {@link Write} transform:
- *
- * <pre>
- * p.apply(Write.to(
- *      XmlSink.ofRecordClass(Type.class)
- *          .withRootElementName(root_element)
- *          .toFilenamePrefix(output_filename)));
- * </pre>
- *
- * <p>For example, consider the following class with JAXB annotations:
- *
- * <pre>
- *  {@literal @}XmlRootElement(name = "word_count_result")
- *  {@literal @}XmlType(propOrder = {"word", "frequency"})
- *  public class WordFrequency {
- *    private String word;
- *    private long frequency;
- *
- *    public WordFrequency() { }
- *
- *    public WordFrequency(String word, long frequency) {
- *      this.word = word;
- *      this.frequency = frequency;
- *    }
- *
- *    public void setWord(String word) {
- *      this.word = word;
- *    }
- *
- *    public void setFrequency(long frequency) {
- *      this.frequency = frequency;
- *    }
- *
- *    public long getFrequency() {
- *      return frequency;
- *    }
- *
- *    public String getWord() {
- *      return word;
- *    }
- *  }
- * </pre>
- *
- * <p>The following will produce XML output with a root element named "words" from a PCollection of
- * WordFrequency objects:
- * <pre>
- * p.apply(Write.to(
- *  XmlSink.ofRecordClass(WordFrequency.class)
- *      .withRootElement("words")
- *      .toFilenamePrefix(output_file)));
- * </pre>
- *
- * <p>The output of which will look like:
- * <pre>
- * {@code
- * <words>
- *
- *  <word_count_result>
- *    <word>decreased</word>
- *    <frequency>1</frequency>
- *  </word_count_result>
- *
- *  <word_count_result>
- *    <word>War</word>
- *    <frequency>4</frequency>
- *  </word_count_result>
- *
- *  <word_count_result>
- *    <word>empress'</word>
- *    <frequency>14</frequency>
- *  </word_count_result>
- *
- *  <word_count_result>
- *    <word>stoops</word>
- *    <frequency>6</frequency>
- *  </word_count_result>
- *
- *  ...
- * </words>
- * }</pre>
- */
-// CHECKSTYLE.ON: JavadocStyle
-@SuppressWarnings("checkstyle:javadocstyle")
-public class XmlSink {
-  protected static final String XML_EXTENSION = "xml";
-
-  /**
-   * Returns a builder for an XmlSink. You'll need to configure the class to bind, the root
-   * element name, and the output file prefix with {@link Bound#ofRecordClass}, {@link
-   * Bound#withRootElement}, and {@link Bound#toFilenamePrefix}, respectively.
-   */
-  public static Bound<?> write() {
-    return new Bound<>(null, null, null);
-  }
-
-  /**
-   * Returns an XmlSink that writes objects as XML entities.
-   *
-   * <p>Output files will have the name {@literal {baseOutputFilename}-0000i-of-0000n.xml} where n
-   * is the number of output bundles that the Dataflow service divides the output into.
-   *
-   * @param klass the class of the elements to write.
-   * @param rootElementName the enclosing root element.
-   * @param baseOutputFilename the output filename prefix.
-   */
-  public static <T> Bound<T> writeOf(
-      Class<T> klass, String rootElementName, String baseOutputFilename) {
-    return new Bound<>(klass, rootElementName, baseOutputFilename);
-  }
-
-  /**
-   * A {@link FileBasedSink} that writes objects as XML elements.
-   */
-  public static class Bound<T> extends FileBasedSink<T> {
-    final Class<T> classToBind;
-    final String rootElementName;
-
-    private Bound(Class<T> classToBind, String rootElementName, String baseOutputFilename) {
-      super(baseOutputFilename, XML_EXTENSION);
-      this.classToBind = classToBind;
-      this.rootElementName = rootElementName;
-    }
-
-    /**
-     * Returns an XmlSink that writes objects of the class specified as XML elements.
-     *
-     * <p>The specified class must be able to be used to create a JAXB context.
-     */
-    public <T> Bound<T> ofRecordClass(Class<T> classToBind) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename);
-    }
-
-    /**
-     * Returns an XmlSink that writes to files with the given prefix.
-     *
-     * <p>Output files will have the name {@literal {filenamePrefix}-0000i-of-0000n.xml} where n is
-     * the number of output bundles that the Dataflow service divides the output into.
-     */
-    public Bound<T> toFilenamePrefix(String baseOutputFilename) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename);
-    }
-
-    /**
-     * Returns an XmlSink that writes XML files with an enclosing root element of the
-     * supplied name.
-     */
-    public Bound<T> withRootElement(String rootElementName) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename);
-    }
-
-    /**
-     * Validates that the root element, class to bind to a JAXB context, and filenamePrefix have
-     * been set and that the class can be bound in a JAXB context.
-     */
-    @Override
-    public void validate(PipelineOptions options) {
-      Preconditions.checkNotNull(classToBind, "Missing a class to bind to a JAXB context.");
-      Preconditions.checkNotNull(rootElementName, "Missing a root element name.");
-      Preconditions.checkNotNull(baseOutputFilename, "Missing a filename to write to.");
-      try {
-        JAXBContext.newInstance(classToBind);
-      } catch (JAXBException e) {
-        throw new RuntimeException("Error binding classes to a JAXB Context.", e);
-      }
-    }
-
-    /**
-     * Creates an {@link XmlWriteOperation}.
-     */
-    @Override
-    public XmlWriteOperation<T> createWriteOperation(PipelineOptions options) {
-      return new XmlWriteOperation<>(this);
-    }
-  }
-
-  /**
-   * {@link Sink.WriteOperation} for XML {@link Sink}s.
-   */
-  protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T> {
-    public XmlWriteOperation(XmlSink.Bound<T> sink) {
-      super(sink);
-    }
-
-    /**
-     * Creates a {@link XmlWriter} with a marshaller for the type it will write.
-     */
-    @Override
-    public XmlWriter<T> createWriter(PipelineOptions options) throws Exception {
-      JAXBContext context;
-      Marshaller marshaller;
-      context = JAXBContext.newInstance(getSink().classToBind);
-      marshaller = context.createMarshaller();
-      marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
-      marshaller.setProperty(Marshaller.JAXB_FRAGMENT, Boolean.TRUE);
-      marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
-      return new XmlWriter<>(this, marshaller);
-    }
-
-    /**
-     * Return the XmlSink.Bound for this write operation.
-     */
-    @Override
-    public XmlSink.Bound<T> getSink() {
-      return (XmlSink.Bound<T>) super.getSink();
-    }
-  }
-
-  /**
-   * A {@link Sink.Writer} that can write objects as XML elements.
-   */
-  protected static final class XmlWriter<T> extends FileBasedWriter<T> {
-    final Marshaller marshaller;
-    private OutputStream os = null;
-
-    public XmlWriter(XmlWriteOperation<T> writeOperation, Marshaller marshaller) {
-      super(writeOperation);
-      this.marshaller = marshaller;
-    }
-
-    /**
-     * Creates the output stream that elements will be written to.
-     */
-    @Override
-    protected void prepareWrite(WritableByteChannel channel) throws Exception {
-      os = Channels.newOutputStream(channel);
-    }
-
-    /**
-     * Writes the root element opening tag.
-     */
-    @Override
-    protected void writeHeader() throws Exception {
-      String rootElementName = getWriteOperation().getSink().rootElementName;
-      os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "<" + rootElementName + ">\n"));
-    }
-
-    /**
-     * Writes the root element closing tag.
-     */
-    @Override
-    protected void writeFooter() throws Exception {
-      String rootElementName = getWriteOperation().getSink().rootElementName;
-      os.write(CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "\n</" + rootElementName + ">"));
-    }
-
-    /**
-     * Writes a value to the stream.
-     */
-    @Override
-    public void write(T value) throws Exception {
-      marshaller.marshal(value, os);
-    }
-
-    /**
-     * Return the XmlWriteOperation this write belongs to.
-     */
-    @Override
-    public XmlWriteOperation<T> getWriteOperation() {
-      return (XmlWriteOperation<T>) super.getWriteOperation();
-    }
-  }
-}


Mime
View raw message