beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [42/67] [partial] incubator-beam git commit: Directory reorganization
Date Thu, 24 Mar 2016 02:48:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
deleted file mode 100644
index 5d32a9d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSource.java
+++ /dev/null
@@ -1,648 +0,0 @@
-/*
- * Copyright (C) 2014 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SeekableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-
-/**
- * A common base class for all file-based {@link Source}s. Extend this class to implement your own
- * file-based custom source.
- *
- * <p>A file-based {@code Source} is a {@code Source} backed by a file pattern defined as a Java
- * glob, a single file, or a offset range for a single file. See {@link OffsetBasedSource} and
- * {@link com.google.cloud.dataflow.sdk.io.range.RangeTracker} for semantics of offset ranges.
- *
- * <p>This source stores a {@code String} that is an {@link IOChannelFactory} specification for a
- * file or file pattern. There should be an {@code IOChannelFactory} defined for the file
- * specification provided. Please refer to {@link IOChannelUtils} and {@link IOChannelFactory} for
- * more information on this.
- *
- * <p>In addition to the methods left abstract from {@code BoundedSource}, subclasses must implement
- * methods to create a sub-source and a reader for a range of a single file -
- * {@link #createForSubrangeOfFile} and {@link #createSingleFileReader}. Please refer to
- * {@link XmlSource} for an example implementation of {@code FileBasedSource}.
- *
- * @param <T> Type of records represented by the source.
- */
-public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
-  private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
-  private static final float FRACTION_OF_FILES_TO_STAT = 0.01f;
-
-  // Package-private for testing
-  static final int MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT = 100;
-
-  // Size of the thread pool to be used for performing file operations in parallel.
-  // Package-private for testing.
-  static final int THREAD_POOL_SIZE = 128;
-
-  private final String fileOrPatternSpec;
-  private final Mode mode;
-
-  /**
-   * A given {@code FileBasedSource} represents a file resource of one of these types.
-   */
-  public enum Mode {
-    FILEPATTERN,
-    SINGLE_FILE_OR_SUBRANGE
-  }
-
-  /**
-   * Create a {@code FileBaseSource} based on a file or a file pattern specification. This
-   * constructor must be used when creating a new {@code FileBasedSource} for a file pattern.
-   *
-   * <p>See {@link OffsetBasedSource} for a detailed description of {@code minBundleSize}.
-   *
-   * @param fileOrPatternSpec {@link IOChannelFactory} specification of file or file pattern
-   *        represented by the {@link FileBasedSource}.
-   * @param minBundleSize minimum bundle size in bytes.
-   */
-  public FileBasedSource(String fileOrPatternSpec, long minBundleSize) {
-    super(0, Long.MAX_VALUE, minBundleSize);
-    mode = Mode.FILEPATTERN;
-    this.fileOrPatternSpec = fileOrPatternSpec;
-  }
-
-  /**
-   * Create a {@code FileBasedSource} based on a single file. This constructor must be used when
-   * creating a new {@code FileBasedSource} for a subrange of a single file.
-   * Additionally, this constructor must be used to create new {@code FileBasedSource}s when
-   * subclasses implement the method {@link #createForSubrangeOfFile}.
-   *
-   * <p>See {@link OffsetBasedSource} for detailed descriptions of {@code minBundleSize},
-   * {@code startOffset}, and {@code endOffset}.
-   *
-   * @param fileName {@link IOChannelFactory} specification of the file represented by the
-   *        {@link FileBasedSource}.
-   * @param minBundleSize minimum bundle size in bytes.
-   * @param startOffset starting byte offset.
-   * @param endOffset ending byte offset. If the specified value {@code >= #getMaxEndOffset()} it
-   *        implies {@code #getMaxEndOffSet()}.
-   */
-  public FileBasedSource(String fileName, long minBundleSize,
-      long startOffset, long endOffset) {
-    super(startOffset, endOffset, minBundleSize);
-    mode = Mode.SINGLE_FILE_OR_SUBRANGE;
-    this.fileOrPatternSpec = fileName;
-  }
-
-  public final String getFileOrPatternSpec() {
-    return fileOrPatternSpec;
-  }
-
-  public final Mode getMode() {
-    return mode;
-  }
-
-  @Override
-  public final FileBasedSource<T> createSourceForSubrange(long start, long end) {
-    Preconditions.checkArgument(mode != Mode.FILEPATTERN,
-        "Cannot split a file pattern based source based on positions");
-    Preconditions.checkArgument(start >= getStartOffset(), "Start offset value " + start
-        + " of the subrange cannot be smaller than the start offset value " + getStartOffset()
-        + " of the parent source");
-    Preconditions.checkArgument(end <= getEndOffset(), "End offset value " + end
-        + " of the subrange cannot be larger than the end offset value " + getEndOffset()
-        + " of the parent source");
-
-    FileBasedSource<T> source = createForSubrangeOfFile(fileOrPatternSpec, start, end);
-    if (start > 0 || end != Long.MAX_VALUE) {
-      Preconditions.checkArgument(source.getMode() == Mode.SINGLE_FILE_OR_SUBRANGE,
-          "Source created for the range [" + start + "," + end + ")"
-          + " must be a subrange source");
-    }
-    return source;
-  }
-
-  /**
-   * Creates and returns a new {@code FileBasedSource} of the same type as the current
-   * {@code FileBasedSource} backed by a given file and an offset range. When current source is
-   * being split, this method is used to generate new sub-sources. When creating the source
-   * subclasses must call the constructor {@link #FileBasedSource(String, long, long, long)} of
-   * {@code FileBasedSource} with corresponding parameter values passed here.
-   *
-   * @param fileName file backing the new {@code FileBasedSource}.
-   * @param start starting byte offset of the new {@code FileBasedSource}.
-   * @param end ending byte offset of the new {@code FileBasedSource}. May be Long.MAX_VALUE,
-   *        in which case it will be inferred using {@link #getMaxEndOffset}.
-   */
-  protected abstract FileBasedSource<T> createForSubrangeOfFile(
-      String fileName, long start, long end);
-
-  /**
-   * Creates and returns an instance of a {@code FileBasedReader} implementation for the current
-   * source assuming the source represents a single file. File patterns will be handled by
-   * {@code FileBasedSource} implementation automatically.
-   */
-  protected abstract FileBasedReader<T> createSingleFileReader(
-      PipelineOptions options);
-
-  @Override
-  public final long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-    // This implementation of method getEstimatedSizeBytes is provided to simplify subclasses. Here
-    // we perform the size estimation of files and file patterns using the interface provided by
-    // IOChannelFactory.
-
-    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-    if (mode == Mode.FILEPATTERN) {
-      // TODO Implement a more efficient parallel/batch size estimation mechanism for file patterns.
-      long startTime = System.currentTimeMillis();
-      long totalSize = 0;
-      Collection<String> inputs = factory.match(fileOrPatternSpec);
-      if (inputs.size() <= MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT) {
-        totalSize = getExactTotalSizeOfFiles(inputs, factory);
-        LOG.debug("Size estimation of all files of pattern " + fileOrPatternSpec + " took "
-           + (System.currentTimeMillis() - startTime) + " ms");
-      } else {
-        totalSize = getEstimatedSizeOfFilesBySampling(inputs, factory);
-        LOG.debug("Size estimation of pattern " + fileOrPatternSpec + " by sampling took "
-           + (System.currentTimeMillis() - startTime) + " ms");
-      }
-      return totalSize;
-    } else {
-      long start = getStartOffset();
-      long end = Math.min(getEndOffset(), getMaxEndOffset(options));
-      return end - start;
-    }
-  }
-
-  // Get the exact total size of the given set of files.
-  // Invokes multiple requests for size estimation in parallel using a thread pool.
-  // TODO: replace this with bulk request API when it is available. Will require updates
-  // to IOChannelFactory interface.
-  private static long getExactTotalSizeOfFiles(
-      Collection<String> files, IOChannelFactory ioChannelFactory) throws Exception {
-    List<ListenableFuture<Long>> futures = new ArrayList<>();
-    ListeningExecutorService service =
-        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
-    long totalSize = 0;
-    try {
-      for (String file : files) {
-        futures.add(createFutureForSizeEstimation(file, ioChannelFactory, service));
-      }
-
-      for (Long val : Futures.allAsList(futures).get()) {
-        totalSize += val;
-      }
-
-      return totalSize;
-    } finally {
-      service.shutdown();
-    }
-  }
-
-  private static ListenableFuture<Long> createFutureForSizeEstimation(
-      final String file,
-      final IOChannelFactory ioChannelFactory,
-      ListeningExecutorService service) {
-    return service.submit(
-        new Callable<Long>() {
-          @Override
-          public Long call() throws Exception {
-            return ioChannelFactory.getSizeBytes(file);
-          }
-        });
-  }
-
-  // Estimate the total size of the given set of files through sampling and extrapolation.
-  // Currently we use uniform sampling which requires a linear sampling size for a reasonable
-  // estimate.
-  // TODO: Implement a more efficient sampling mechanism.
-  private static long getEstimatedSizeOfFilesBySampling(
-      Collection<String> files, IOChannelFactory ioChannelFactory) throws Exception {
-    int sampleSize = (int) (FRACTION_OF_FILES_TO_STAT * files.size());
-    sampleSize = Math.max(MAX_NUMBER_OF_FILES_FOR_AN_EXACT_STAT, sampleSize);
-
-    List<String> selectedFiles = new ArrayList<String>(files);
-    Collections.shuffle(selectedFiles);
-    selectedFiles = selectedFiles.subList(0, sampleSize);
-
-    return files.size() * getExactTotalSizeOfFiles(selectedFiles, ioChannelFactory)
-        / selectedFiles.size();
-  }
-
-  private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
-      final String file,
-      final long desiredBundleSizeBytes,
-      final PipelineOptions options,
-      ListeningExecutorService service) {
-    return service.submit(new Callable<List<? extends FileBasedSource<T>>>() {
-      @Override
-      public List<? extends FileBasedSource<T>> call() throws Exception {
-        return createForSubrangeOfFile(file, 0, Long.MAX_VALUE)
-            .splitIntoBundles(desiredBundleSizeBytes, options);
-      }
-    });
-  }
-
-  @Override
-  public final List<? extends FileBasedSource<T>> splitIntoBundles(
-      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    // This implementation of method splitIntoBundles is provided to simplify subclasses. Here we
-    // split a FileBasedSource based on a file pattern to FileBasedSources based on full single
-    // files. For files that can be efficiently seeked, we further split FileBasedSources based on
-    // those files to FileBasedSources based on sub ranges of single files.
-
-    if (mode == Mode.FILEPATTERN) {
-      long startTime = System.currentTimeMillis();
-      List<ListenableFuture<List<? extends FileBasedSource<T>>>> futures = new ArrayList<>();
-
-      ListeningExecutorService service =
-          MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(THREAD_POOL_SIZE));
-      try {
-        for (final String file : FileBasedSource.expandFilePattern(fileOrPatternSpec)) {
-          futures.add(createFutureForFileSplit(file, desiredBundleSizeBytes, options, service));
-        }
-        List<? extends FileBasedSource<T>> splitResults =
-            ImmutableList.copyOf(Iterables.concat(Futures.allAsList(futures).get()));
-        LOG.debug(
-            "Splitting the source based on file pattern "
-                + fileOrPatternSpec
-                + " took "
-                + (System.currentTimeMillis() - startTime)
-                + " ms");
-        return splitResults;
-      } finally {
-        service.shutdown();
-      }
-    } else {
-      if (isSplittable()) {
-        List<FileBasedSource<T>> splitResults = new ArrayList<>();
-        for (OffsetBasedSource<T> split :
-            super.splitIntoBundles(desiredBundleSizeBytes, options)) {
-          splitResults.add((FileBasedSource<T>) split);
-        }
-        return splitResults;
-      } else {
-        LOG.debug("The source for file " + fileOrPatternSpec
-            + " is not split into sub-range based sources since the file is not seekable");
-        return ImmutableList.of(this);
-      }
-    }
-  }
-
-  /**
-   * Determines whether a file represented by this source is can be split into bundles.
-   *
-   * <p>By default, a file is splittable if it is on a file system that supports efficient read
-   * seeking. Subclasses may override to provide different behavior.
-   */
-  protected boolean isSplittable() throws Exception {
-    // We split a file-based source into subranges only if the file is efficiently seekable.
-    // If a file is not efficiently seekable it would be highly inefficient to create and read a
-    // source based on a subrange of that file.
-    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-    return factory.isReadSeekEfficient(fileOrPatternSpec);
-  }
-
-  @Override
-  public final BoundedReader<T> createReader(PipelineOptions options) throws IOException {
-    // Validate the current source prior to creating a reader for it.
-    this.validate();
-
-    if (mode == Mode.FILEPATTERN) {
-      long startTime = System.currentTimeMillis();
-      Collection<String> files = FileBasedSource.expandFilePattern(fileOrPatternSpec);
-      List<FileBasedReader<T>> fileReaders = new ArrayList<>();
-      for (String fileName : files) {
-        long endOffset;
-        try {
-          endOffset = IOChannelUtils.getFactory(fileName).getSizeBytes(fileName);
-        } catch (IOException e) {
-          LOG.warn("Failed to get size of " + fileName, e);
-          endOffset = Long.MAX_VALUE;
-        }
-        fileReaders.add(
-            createForSubrangeOfFile(fileName, 0, endOffset).createSingleFileReader(options));
-      }
-      LOG.debug("Creating a reader for file pattern " + fileOrPatternSpec + " took "
-          + (System.currentTimeMillis() - startTime) + " ms");
-      if (fileReaders.size() == 1) {
-        return fileReaders.get(0);
-      }
-      return new FilePatternReader(this, fileReaders);
-    } else {
-      return createSingleFileReader(options);
-    }
-  }
-
-  @Override
-  public String toString() {
-    switch (mode) {
-      case FILEPATTERN:
-        return fileOrPatternSpec;
-      case SINGLE_FILE_OR_SUBRANGE:
-        return fileOrPatternSpec + " range " + super.toString();
-      default:
-        throw new IllegalStateException("Unexpected mode: " + mode);
-    }
-  }
-
-  @Override
-  public void validate() {
-    super.validate();
-    switch (mode) {
-      case FILEPATTERN:
-        Preconditions.checkArgument(getStartOffset() == 0,
-            "FileBasedSource is based on a file pattern or a full single file "
-            + "but the starting offset proposed " + getStartOffset() + " is not zero");
-        Preconditions.checkArgument(getEndOffset() == Long.MAX_VALUE,
-            "FileBasedSource is based on a file pattern or a full single file "
-            + "but the ending offset proposed " + getEndOffset() + " is not Long.MAX_VALUE");
-        break;
-      case SINGLE_FILE_OR_SUBRANGE:
-        // Nothing more to validate.
-        break;
-      default:
-        throw new IllegalStateException("Unknown mode: " + mode);
-    }
-  }
-
-  @Override
-  public final long getMaxEndOffset(PipelineOptions options) throws Exception {
-    if (mode == Mode.FILEPATTERN) {
-      throw new IllegalArgumentException("Cannot determine the exact end offset of a file pattern");
-    }
-    if (getEndOffset() == Long.MAX_VALUE) {
-      IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-      return factory.getSizeBytes(fileOrPatternSpec);
-    } else {
-      return getEndOffset();
-    }
-  }
-
-  protected static final Collection<String> expandFilePattern(String fileOrPatternSpec)
-      throws IOException {
-    IOChannelFactory factory = IOChannelUtils.getFactory(fileOrPatternSpec);
-    Collection<String> matches = factory.match(fileOrPatternSpec);
-    LOG.info("Matched {} files for pattern {}", matches.size(), fileOrPatternSpec);
-    return matches;
-  }
-
-  /**
-   * A {@link Source.Reader reader} that implements code common to readers of
-   * {@code FileBasedSource}s.
-   *
-   * <h2>Seekability</h2>
-   *
-   * <p>This reader uses a {@link ReadableByteChannel} created for the file represented by the
-   * corresponding source to efficiently move to the correct starting position defined in the
-   * source. Subclasses of this reader should implement {@link #startReading} to get access to this
-   * channel. If the source corresponding to the reader is for a subrange of a file the
-   * {@code ReadableByteChannel} provided is guaranteed to be an instance of the type
-   * {@link SeekableByteChannel}, which may be used by subclass to traverse back in the channel to
-   * determine the correct starting position.
-   *
-   * <h2>Reading Records</h2>
-   *
-   * <p>Sequential reading is implemented using {@link #readNextRecord}.
-   *
-   * <p>Then {@code FileBasedReader} implements "reading a range [A, B)" in the following way.
-   * <ol>
-   * <li>{@link #start} opens the file
-   * <li>{@link #start} seeks the {@code SeekableByteChannel} to A (reading offset ranges for
-   * non-seekable files is not supported) and calls {@code startReading()}
-   * <li>{@link #start} calls {@link #advance} once, which, via {@link #readNextRecord},
-   * locates the first record which is at a split point AND its offset is at or after A.
-   * If this record is at or after B, {@link #advance} returns false and reading is finished.
-   * <li>if the previous advance call returned {@code true} sequential reading starts and
-   * {@code advance()} will be called repeatedly
-   * </ol>
-   * {@code advance()} calls {@code readNextRecord()} on the subclass, and stops (returns false) if
-   * the new record is at a split point AND the offset of the new record is at or after B.
-   *
-   * <h2>Thread Safety</h2>
-   *
-   * <p>Since this class implements {@link Source.Reader} it guarantees thread safety. Abstract
-   * methods defined here will not be accessed by more than one thread concurrently.
-   */
-  public abstract static class FileBasedReader<T> extends OffsetBasedReader<T> {
-    private ReadableByteChannel channel = null;
-
-    /**
-     * Subclasses should not perform IO operations at the constructor. All IO operations should be
-     * delayed until the {@link #startReading} method is invoked.
-     */
-    public FileBasedReader(FileBasedSource<T> source) {
-      super(source);
-      Preconditions.checkArgument(source.getMode() != Mode.FILEPATTERN,
-          "FileBasedReader does not support reading file patterns");
-    }
-
-    @Override
-    public FileBasedSource<T> getCurrentSource() {
-      return (FileBasedSource<T>) super.getCurrentSource();
-    }
-
-    @Override
-    protected final boolean startImpl() throws IOException {
-      FileBasedSource<T> source = getCurrentSource();
-      IOChannelFactory factory = IOChannelUtils.getFactory(source.getFileOrPatternSpec());
-      this.channel = factory.open(source.getFileOrPatternSpec());
-
-      if (channel instanceof SeekableByteChannel) {
-        SeekableByteChannel seekChannel = (SeekableByteChannel) channel;
-        seekChannel.position(source.getStartOffset());
-      } else {
-        // Channel is not seekable. Must not be a subrange.
-        Preconditions.checkArgument(source.mode != Mode.SINGLE_FILE_OR_SUBRANGE,
-            "Subrange-based sources must only be defined for file types that support seekable "
-            + " read channels");
-        Preconditions.checkArgument(source.getStartOffset() == 0, "Start offset "
-            + source.getStartOffset()
-            + " is not zero but channel for reading the file is not seekable.");
-      }
-
-      startReading(channel);
-
-      // Advance once to load the first record.
-      return advanceImpl();
-    }
-
-    @Override
-    protected final boolean advanceImpl() throws IOException {
-      return readNextRecord();
-    }
-
-    /**
-     * Closes any {@link ReadableByteChannel} created for the current reader. This implementation is
-     * idempotent. Any {@code close()} method introduced by a subclass must be idempotent and must
-     * call the {@code close()} method in the {@code FileBasedReader}.
-     */
-    @Override
-    public void close() throws IOException {
-      if (channel != null) {
-        channel.close();
-      }
-    }
-
-    /**
-     * Performs any initialization of the subclass of {@code FileBasedReader} that involves IO
-     * operations. Will only be invoked once and before that invocation the base class will seek the
-     * channel to the source's starting offset.
-     *
-     * <p>Provided {@link ReadableByteChannel} is for the file represented by the source of this
-     * reader. Subclass may use the {@code channel} to build a higher level IO abstraction, e.g., a
-     * BufferedReader or an XML parser.
-     *
-     * <p>If the corresponding source is for a subrange of a file, {@code channel} is guaranteed to
-     * be an instance of the type {@link SeekableByteChannel}.
-     *
-     * <p>After this method is invoked the base class will not be reading data from the channel or
-     * adjusting the position of the channel. But the base class is responsible for properly closing
-     * the channel.
-     *
-     * @param channel a byte channel representing the file backing the reader.
-     */
-    protected abstract void startReading(ReadableByteChannel channel) throws IOException;
-
-    /**
-     * Reads the next record from the channel provided by {@link #startReading}. Methods
-     * {@link #getCurrent}, {@link #getCurrentOffset}, and {@link #isAtSplitPoint()} should return
-     * the corresponding information about the record read by the last invocation of this method.
-     *
-     * <p>Note that this method will be called the same way for reading the first record in the
-     * source (file or offset range in the file) and for reading subsequent records. It is up to the
-     * subclass to do anything special for locating and reading the first record, if necessary.
-     *
-     * @return {@code true} if a record was successfully read, {@code false} if the end of the
-     *         channel was reached before successfully reading a new record.
-     */
-    protected abstract boolean readNextRecord() throws IOException;
-  }
-
-  // An internal Reader implementation that concatenates a sequence of FileBasedReaders.
-  private class FilePatternReader extends BoundedReader<T> {
-    private final FileBasedSource<T> source;
-    private final List<FileBasedReader<T>> fileReaders;
-    final ListIterator<FileBasedReader<T>> fileReadersIterator;
-    FileBasedReader<T> currentReader = null;
-
-    public FilePatternReader(FileBasedSource<T> source, List<FileBasedReader<T>> fileReaders) {
-      this.source = source;
-      this.fileReaders = fileReaders;
-      this.fileReadersIterator = fileReaders.listIterator();
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      return startNextNonemptyReader();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      Preconditions.checkState(currentReader != null, "Call start() before advance()");
-      if (currentReader.advance()) {
-        return true;
-      }
-      return startNextNonemptyReader();
-    }
-
-    private boolean startNextNonemptyReader() throws IOException {
-      while (fileReadersIterator.hasNext()) {
-        currentReader = fileReadersIterator.next();
-        if (currentReader.start()) {
-          return true;
-        }
-        currentReader.close();
-      }
-      return false;
-    }
-
-    @Override
-    public T getCurrent() throws NoSuchElementException {
-      // A NoSuchElement will be thrown by the last FileBasedReader if getCurrent() is called after
-      // advance() returns false.
-      return currentReader.getCurrent();
-    }
-
-    @Override
-    public Instant getCurrentTimestamp() throws NoSuchElementException {
-      // A NoSuchElement will be thrown by the last FileBasedReader if getCurrentTimestamp()
-      // is called after advance() returns false.
-      return currentReader.getCurrentTimestamp();
-    }
-
-    @Override
-    public void close() throws IOException {
-      // Close all readers that may have not yet been closed.
-      // If this reader has not been started, currentReader is null.
-      if (currentReader != null) {
-        currentReader.close();
-      }
-      while (fileReadersIterator.hasNext()) {
-        fileReadersIterator.next().close();
-      }
-    }
-
-    @Override
-    public FileBasedSource<T> getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public FileBasedSource<T> splitAtFraction(double fraction) {
-      // Unsupported. TODO: implement.
-      LOG.debug("Dynamic splitting of FilePatternReader is unsupported.");
-      return null;
-    }
-
-    @Override
-    public Double getFractionConsumed() {
-      if (currentReader == null) {
-        return 0.0;
-      }
-      if (fileReaders.isEmpty()) {
-        return 1.0;
-      }
-      int index = fileReadersIterator.previousIndex();
-      int numReaders = fileReaders.size();
-      if (index == numReaders) {
-        return 1.0;
-      }
-      double before = 1.0 * index / numReaders;
-      double after = 1.0 * (index + 1) / numReaders;
-      Double fractionOfCurrentReader = currentReader.getFractionConsumed();
-      if (fractionOfCurrentReader == null) {
-        return before;
-      }
-      return before + fractionOfCurrentReader * (after - before);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java
deleted file mode 100644
index d581b80..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/OffsetBasedSource.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Copyright (C) 2014 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.cloud.dataflow.sdk.io.range.OffsetRangeTracker;
-import com.google.cloud.dataflow.sdk.io.range.RangeTracker;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.common.base.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-/**
- * A {@link BoundedSource} that uses offsets to define starting and ending positions.
- *
- * <p>{@link OffsetBasedSource} is a common base class for all bounded sources where the input can
- * be represented as a single range, and an input can be efficiently processed in parallel by
- * splitting the range into a set of disjoint ranges whose union is the original range. This class
- * should be used for sources that can be cheaply read starting at any given offset.
- * {@link OffsetBasedSource} stores the range and implements splitting into bundles.
- *
- * <p>Extend {@link OffsetBasedSource} to implement your own offset-based custom source.
- * {@link FileBasedSource}, which is a subclass of this, adds additional functionality useful for
- * custom sources that are based on files. If possible implementors should start from
- * {@link FileBasedSource} instead of {@link OffsetBasedSource}.
- *
- * <p>Consult {@link RangeTracker} for important semantics common to all sources defined by a range
- * of positions of a certain type, including the semantics of split points
- * ({@link OffsetBasedReader#isAtSplitPoint}).
- *
- * @param <T> Type of records represented by the source.
- * @see BoundedSource
- * @see FileBasedSource
- * @see RangeTracker
- */
-public abstract class OffsetBasedSource<T> extends BoundedSource<T> {
-  private final long startOffset;
-  private final long endOffset;
-  private final long minBundleSize;
-
-  /**
-   * @param startOffset starting offset (inclusive) of the source. Must be non-negative.
-   *
-   * @param endOffset ending offset (exclusive) of the source. Use {@link Long#MAX_VALUE} to
-   *        indicate that the entire source after {@code startOffset} should be read. Must be
-   *        {@code > startOffset}.
-   *
-   * @param minBundleSize minimum bundle size in offset units that should be used when splitting the
-   *                      source into sub-sources. This value may not be respected if the total
-   *                      range of the source is smaller than the specified {@code minBundleSize}.
-   *                      Must be non-negative.
-   */
-  public OffsetBasedSource(long startOffset, long endOffset, long minBundleSize) {
-    this.startOffset = startOffset;
-    this.endOffset = endOffset;
-    this.minBundleSize = minBundleSize;
-  }
-
-  /**
-   * Returns the starting offset of the source.
-   */
-  public long getStartOffset() {
-    return startOffset;
-  }
-
-  /**
-   * Returns the specified ending offset of the source. Any returned value greater than or equal to
-   * {@link #getMaxEndOffset(PipelineOptions)} should be treated as
-   * {@link #getMaxEndOffset(PipelineOptions)}.
-   */
-  public long getEndOffset() {
-    return endOffset;
-  }
-
-  /**
-   * Returns the minimum bundle size that should be used when splitting the source into sub-sources.
-   * This value may not be respected if the total range of the source is smaller than the specified
-   * {@code minBundleSize}.
-   */
-  public long getMinBundleSize() {
-    return minBundleSize;
-  }
-
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-    long trueEndOffset = (endOffset == Long.MAX_VALUE) ? getMaxEndOffset(options) : endOffset;
-    return getBytesPerOffset() * (trueEndOffset - getStartOffset());
-  }
-
-  @Override
-  public List<? extends OffsetBasedSource<T>> splitIntoBundles(
-      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    // Split the range into bundles based on the desiredBundleSizeBytes. Final bundle is adjusted to
-    // make sure that we do not end up with a too small bundle at the end. If the desired bundle
-    // size is smaller than the minBundleSize of the source then minBundleSize will be used instead.
-
-    long desiredBundleSizeOffsetUnits = Math.max(
-        Math.max(1, desiredBundleSizeBytes / getBytesPerOffset()),
-        minBundleSize);
-
-    List<OffsetBasedSource<T>> subSources = new ArrayList<>();
-    long start = startOffset;
-    long maxEnd = Math.min(endOffset, getMaxEndOffset(options));
-
-    while (start < maxEnd) {
-      long end = start + desiredBundleSizeOffsetUnits;
-      end = Math.min(end, maxEnd);
-      // Avoid having a too small bundle at the end and ensure that we respect minBundleSize.
-      long remaining = maxEnd - end;
-      if ((remaining < desiredBundleSizeOffsetUnits / 4) || (remaining < minBundleSize)) {
-        end = maxEnd;
-      }
-      subSources.add(createSourceForSubrange(start, end));
-
-      start = end;
-    }
-    return subSources;
-  }
-
-  @Override
-  public void validate() {
-    Preconditions.checkArgument(
-        this.startOffset >= 0,
-        "Start offset has value %s, must be non-negative", this.startOffset);
-    Preconditions.checkArgument(
-        this.endOffset >= 0,
-        "End offset has value %s, must be non-negative", this.endOffset);
-    Preconditions.checkArgument(
-        this.startOffset < this.endOffset,
-        "Start offset %s must be before end offset %s",
-        this.startOffset, this.endOffset);
-    Preconditions.checkArgument(
-        this.minBundleSize >= 0,
-        "minBundleSize has value %s, must be non-negative",
-        this.minBundleSize);
-  }
-
-  @Override
-  public String toString() {
-    return "[" + startOffset + ", " + endOffset + ")";
-  }
-
-  /**
-   * Returns approximately how many bytes of data correspond to a single offset in this source.
-   * Used for translation between this source's range and methods defined in terms of bytes, such
-   * as {@link #getEstimatedSizeBytes} and {@link #splitIntoBundles}.
-   *
-   * <p>Defaults to {@code 1} byte, which is the common case for, e.g., file sources.
-   */
-  public long getBytesPerOffset() {
-    return 1L;
-  }
-
-  /**
-   * Returns the actual ending offset of the current source. The value returned by this function
-   * will be used to clip the end of the range {@code [startOffset, endOffset)} such that the
-   * range used is {@code [startOffset, min(endOffset, maxEndOffset))}.
-   *
-   * <p>As an example in which {@link OffsetBasedSource} is used to implement a file source, suppose
-   * that this source was constructed with an {@code endOffset} of {@link Long#MAX_VALUE} to
-   * indicate that a file should be read to the end. Then {@link #getMaxEndOffset} should determine
-   * the actual, exact size of the file in bytes and return it.
-   */
-  public abstract long getMaxEndOffset(PipelineOptions options) throws Exception;
-
-  /**
-   * Returns an {@link OffsetBasedSource} for a subrange of the current source. The
-   * subrange {@code [start, end)} must be within the range {@code [startOffset, endOffset)} of
-   * the current source, i.e. {@code startOffset <= start < end <= endOffset}.
-   */
-  public abstract OffsetBasedSource<T> createSourceForSubrange(long start, long end);
-
-  /**
-   * Whether this source should allow dynamic splitting of the offset ranges.
-   *
-   * <p>True by default. Override this to return false if the source cannot
-   * support dynamic splitting correctly. If this returns false,
-   * {@link OffsetBasedSource.OffsetBasedReader#splitAtFraction} will refuse all split requests.
-   */
-  public boolean allowsDynamicSplitting() {
-    return true;
-  }
-
-  /**
-   * A {@link Source.Reader} that implements code common to readers of all
-   * {@link OffsetBasedSource}s.
-   *
-   * <p>Subclasses have to implement:
-   * <ul>
-   *   <li>The methods {@link #startImpl} and {@link #advanceImpl} for reading the
-   *   first or subsequent records.
-   *   <li>The methods {@link #getCurrent}, {@link #getCurrentOffset}, and optionally
-   *   {@link #isAtSplitPoint} and {@link #getCurrentTimestamp} to access properties of
-   *   the last record successfully read by {@link #startImpl} or {@link #advanceImpl}.
-   * </ul>
-   */
-  public abstract static class OffsetBasedReader<T> extends BoundedReader<T> {
-    private static final Logger LOG = LoggerFactory.getLogger(OffsetBasedReader.class);
-
-    private OffsetBasedSource<T> source;
-
-    /** The {@link OffsetRangeTracker} managing the range and current position of the source. */
-    private final OffsetRangeTracker rangeTracker;
-
-    /**
-     * @param source the {@link OffsetBasedSource} to be read by the current reader.
-     */
-    public OffsetBasedReader(OffsetBasedSource<T> source) {
-      this.source = source;
-      this.rangeTracker = new OffsetRangeTracker(source.getStartOffset(), source.getEndOffset());
-    }
-
-    /**
-     * Returns the <i>starting</i> offset of the {@link Source.Reader#getCurrent current record},
-     * which has been read by the last successful {@link Source.Reader#start} or
-     * {@link Source.Reader#advance} call.
-     * <p>If no such call has been made yet, the return value is unspecified.
-     * <p>See {@link RangeTracker} for description of offset semantics.
-     */
-    protected abstract long getCurrentOffset() throws NoSuchElementException;
-
-    /**
-     * Returns whether the current record is at a split point (i.e., whether the current record
-     * would be the first record to be read by a source with a specified start offset of
-     * {@link #getCurrentOffset}).
-     *
-     * <p>See detailed documentation about split points in {@link RangeTracker}.
-     */
-    protected boolean isAtSplitPoint() throws NoSuchElementException {
-      return true;
-    }
-
-    @Override
-    public final boolean start() throws IOException {
-      return startImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset());
-    }
-
-    @Override
-    public final boolean advance() throws IOException {
-      return advanceImpl() && rangeTracker.tryReturnRecordAt(isAtSplitPoint(), getCurrentOffset());
-    }
-
-    /**
-     * Initializes the {@link OffsetBasedSource.OffsetBasedReader} and advances to the first record,
-     * returning {@code true} if there is a record available to be read. This method will be
-     * invoked exactly once and may perform expensive setup operations that are needed to
-     * initialize the reader.
-     *
-     * <p>This function is the {@code OffsetBasedReader} implementation of
-     * {@link BoundedReader#start}. The key difference is that the implementor can ignore the
-     * possibility that it should no longer produce the first record, either because it has exceeded
-     * the original {@code endOffset} assigned to the reader, or because a concurrent call to
-     * {@link #splitAtFraction} has changed the source to shrink the offset range being read.
-     *
-     * @see BoundedReader#start
-     */
-    protected abstract boolean startImpl() throws IOException;
-
-    /**
-     * Advances to the next record and returns {@code true}, or returns false if there is no next
-     * record.
-     *
-     * <p>This function is the {@code OffsetBasedReader} implementation of
-     * {@link BoundedReader#advance}. The key difference is that the implementor can ignore the
-     * possibility that it should no longer produce the next record, either because it has exceeded
-     * the original {@code endOffset} assigned to the reader, or because a concurrent call to
-     * {@link #splitAtFraction} has changed the source to shrink the offset range being read.
-     *
-     * @see BoundedReader#advance
-     */
-    protected abstract boolean advanceImpl() throws IOException;
-
-    @Override
-    public synchronized OffsetBasedSource<T> getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public Double getFractionConsumed() {
-      return rangeTracker.getFractionConsumed();
-    }
-
-    @Override
-    public final synchronized OffsetBasedSource<T> splitAtFraction(double fraction) {
-      if (!getCurrentSource().allowsDynamicSplitting()) {
-        return null;
-      }
-      if (rangeTracker.getStopPosition() == Long.MAX_VALUE) {
-        LOG.debug(
-            "Refusing to split unbounded OffsetBasedReader {} at fraction {}",
-            rangeTracker, fraction);
-        return null;
-      }
-      long splitOffset = rangeTracker.getPositionForFractionConsumed(fraction);
-      LOG.debug(
-          "Proposing to split OffsetBasedReader {} at fraction {} (offset {})",
-          rangeTracker, fraction, splitOffset);
-      if (!rangeTracker.trySplitAtPosition(splitOffset)) {
-        return null;
-      }
-      long start = source.getStartOffset();
-      long end = source.getEndOffset();
-      OffsetBasedSource<T> primary = source.createSourceForSubrange(start, splitOffset);
-      OffsetBasedSource<T> residual = source.createSourceForSubrange(splitOffset, end);
-      this.source = primary;
-      return residual;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
deleted file mode 100644
index 653b31f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
+++ /dev/null
@@ -1,1044 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.common.base.MoreObjects.firstNonNull;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.api.client.util.Clock;
-import com.google.api.client.util.DateTime;
-import com.google.api.services.pubsub.Pubsub;
-import com.google.api.services.pubsub.model.AcknowledgeRequest;
-import com.google.api.services.pubsub.model.PublishRequest;
-import com.google.api.services.pubsub.model.PubsubMessage;
-import com.google.api.services.pubsub.model.PullRequest;
-import com.google.api.services.pubsub.model.PullResponse;
-import com.google.api.services.pubsub.model.ReceivedMessage;
-import com.google.api.services.pubsub.model.Subscription;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.coders.VoidCoder;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
-import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
-import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
-import com.google.cloud.dataflow.sdk.util.CoderUtils;
-import com.google.cloud.dataflow.sdk.util.Transport;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import com.google.cloud.dataflow.sdk.values.PDone;
-import com.google.cloud.dataflow.sdk.values.PInput;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-/**
- * Read and Write {@link PTransform}s for Cloud Pub/Sub streams. These transforms create
- * and consume unbounded {@link PCollection PCollections}.
- *
- * <h3>Permissions</h3>
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding
- * {@link PipelineRunner PipelineRunners} for more details.
- */
-public class PubsubIO {
-  private static final Logger LOG = LoggerFactory.getLogger(PubsubIO.class);
-
-  /** The default {@link Coder} used to translate to/from Cloud Pub/Sub messages. */
-  public static final Coder<String> DEFAULT_PUBSUB_CODER = StringUtf8Coder.of();
-
-  /**
-   * Project IDs must contain 6-63 lowercase letters, digits, or dashes.
-   * IDs must start with a letter and may not end with a dash.
-   * This regex isn't exact - this allows for patterns that would be rejected by
-   * the service, but this is sufficient for basic parsing of table references.
-   */
-  private static final Pattern PROJECT_ID_REGEXP =
-      Pattern.compile("[a-z][-a-z0-9:.]{4,61}[a-z0-9]");
-
-  private static final Pattern SUBSCRIPTION_REGEXP =
-      Pattern.compile("projects/([^/]+)/subscriptions/(.+)");
-
-  private static final Pattern TOPIC_REGEXP = Pattern.compile("projects/([^/]+)/topics/(.+)");
-
-  private static final Pattern V1BETA1_SUBSCRIPTION_REGEXP =
-      Pattern.compile("/subscriptions/([^/]+)/(.+)");
-
-  private static final Pattern V1BETA1_TOPIC_REGEXP = Pattern.compile("/topics/([^/]+)/(.+)");
-
-  private static final Pattern PUBSUB_NAME_REGEXP = Pattern.compile("[a-zA-Z][-._~%+a-zA-Z0-9]+");
-
-  private static final int PUBSUB_NAME_MAX_LENGTH = 255;
-
-  private static final String SUBSCRIPTION_RANDOM_TEST_PREFIX = "_random/";
-  private static final String SUBSCRIPTION_STARTING_SIGNAL = "_starting_signal/";
-  private static final String TOPIC_DEV_NULL_TEST_NAME = "/topics/dev/null";
-
-  private static void validateProjectName(String project) {
-    Matcher match = PROJECT_ID_REGEXP.matcher(project);
-    if (!match.matches()) {
-      throw new IllegalArgumentException(
-          "Illegal project name specified in Pubsub subscription: " + project);
-    }
-  }
-
-  private static void validatePubsubName(String name) {
-    if (name.length() > PUBSUB_NAME_MAX_LENGTH) {
-      throw new IllegalArgumentException(
-          "Pubsub object name is longer than 255 characters: " + name);
-    }
-
-    if (name.startsWith("goog")) {
-      throw new IllegalArgumentException("Pubsub object name cannot start with goog: " + name);
-    }
-
-    Matcher match = PUBSUB_NAME_REGEXP.matcher(name);
-    if (!match.matches()) {
-      throw new IllegalArgumentException("Illegal Pubsub object name specified: " + name
-          + " Please see Javadoc for naming rules.");
-    }
-  }
-
-  /**
-   * Returns the {@link Instant} that corresponds to the timestamp in the supplied
-   * {@link PubsubMessage} under the specified {@code ink label}. See
-   * {@link PubsubIO.Read#timestampLabel(String)} for details about how these messages are
-   * parsed.
-   *
-   * <p>The {@link Clock} parameter is used to virtualize time for testing.
-   *
-   * @throws IllegalArgumentException if the timestamp label is provided, but there is no
-   *     corresponding attribute in the message or the value provided is not a valid timestamp
-   *     string.
-   * @see PubsubIO.Read#timestampLabel(String)
-   */
-  @VisibleForTesting
-  protected static Instant assignMessageTimestamp(
-      PubsubMessage message, @Nullable String label, Clock clock) {
-    if (label == null) {
-      return new Instant(clock.currentTimeMillis());
-    }
-
-    // Extract message attributes, defaulting to empty map if null.
-    Map<String, String> attributes = firstNonNull(
-        message.getAttributes(), ImmutableMap.<String, String>of());
-
-    String timestampStr = attributes.get(label);
-    checkArgument(timestampStr != null && !timestampStr.isEmpty(),
-        "PubSub message is missing a timestamp in label: %s", label);
-
-    long millisSinceEpoch;
-    try {
-      // Try parsing as milliseconds since epoch. Note there is no way to parse a string in
-      // RFC 3339 format here.
-      // Expected IllegalArgumentException if parsing fails; we use that to fall back to RFC 3339.
-      millisSinceEpoch = Long.parseLong(timestampStr);
-    } catch (IllegalArgumentException e) {
-      // Try parsing as RFC3339 string. DateTime.parseRfc3339 will throw an IllegalArgumentException
-      // if parsing fails, and the caller should handle.
-      millisSinceEpoch = DateTime.parseRfc3339(timestampStr).getValue();
-    }
-    return new Instant(millisSinceEpoch);
-  }
-
-  /**
-   * Class representing a Cloud Pub/Sub Subscription.
-   */
-  public static class PubsubSubscription implements Serializable {
-    private enum Type { NORMAL, FAKE }
-
-    private final Type type;
-    private final String project;
-    private final String subscription;
-
-    private PubsubSubscription(Type type, String project, String subscription) {
-      this.type = type;
-      this.project = project;
-      this.subscription = subscription;
-    }
-
-    /**
-     * Creates a class representing a Pub/Sub subscription from the specified subscription path.
-     *
-     * <p>Cloud Pub/Sub subscription names should be of the form
-     * {@code projects/<project>/subscriptions/<subscription>}, where {@code <project>} is the name
-     * of the project the subscription belongs to. The {@code <subscription>} component must comply
-     * with the following requirements:
-     *
-     * <ul>
-     * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
-     * ('.').</li>
-     * <li>Must be between 3 and 255 characters.</li>
-     * <li>Must begin with a letter.</li>
-     * <li>Must end with a letter or a number.</li>
-     * <li>Cannot begin with {@code 'goog'} prefix.</li>
-     * </ul>
-     */
-    public static PubsubSubscription fromPath(String path) {
-      if (path.startsWith(SUBSCRIPTION_RANDOM_TEST_PREFIX)
-          || path.startsWith(SUBSCRIPTION_STARTING_SIGNAL)) {
-        return new PubsubSubscription(Type.FAKE, "", path);
-      }
-
-      String projectName, subscriptionName;
-
-      Matcher v1beta1Match = V1BETA1_SUBSCRIPTION_REGEXP.matcher(path);
-      if (v1beta1Match.matches()) {
-        LOG.warn("Saw subscription in v1beta1 format. Subscriptions should be in the format "
-            + "projects/<project_id>/subscriptions/<subscription_name>");
-        projectName = v1beta1Match.group(1);
-        subscriptionName = v1beta1Match.group(2);
-      } else {
-        Matcher match = SUBSCRIPTION_REGEXP.matcher(path);
-        if (!match.matches()) {
-          throw new IllegalArgumentException("Pubsub subscription is not in "
-              + "projects/<project_id>/subscriptions/<subscription_name> format: " + path);
-        }
-        projectName = match.group(1);
-        subscriptionName = match.group(2);
-      }
-
-      validateProjectName(projectName);
-      validatePubsubName(subscriptionName);
-      return new PubsubSubscription(Type.NORMAL, projectName, subscriptionName);
-    }
-
-    /**
-     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
-     * v1beta1 API.
-     *
-     * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta1Path() {
-      if (type == Type.NORMAL) {
-        return "/subscriptions/" + project + "/" + subscription;
-      } else {
-        return subscription;
-      }
-    }
-
-    /**
-     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
-     * v1beta2 API.
-     *
-     * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta2Path() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/subscriptions/" + subscription;
-      } else {
-        return subscription;
-      }
-    }
-
-    /**
-     * Returns the string representation of this subscription as a path used in the Cloud Pub/Sub
-     * API.
-     */
-    public String asPath() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/subscriptions/" + subscription;
-      } else {
-        return subscription;
-      }
-    }
-  }
-
-  /**
-   * Class representing a Cloud Pub/Sub Topic.
-   */
-  public static class PubsubTopic implements Serializable {
-    private enum Type { NORMAL, FAKE }
-
-    private final Type type;
-    private final String project;
-    private final String topic;
-
-    private PubsubTopic(Type type, String project, String topic) {
-      this.type = type;
-      this.project = project;
-      this.topic = topic;
-    }
-
-    /**
-     * Creates a class representing a Cloud Pub/Sub topic from the specified topic path.
-     *
-     * <p>Cloud Pub/Sub topic names should be of the form
-     * {@code /topics/<project>/<topic>}, where {@code <project>} is the name of
-     * the publishing project. The {@code <topic>} component must comply with
-     * the following requirements:
-     *
-     * <ul>
-     * <li>Can only contain lowercase letters, numbers, dashes ('-'), underscores ('_') and periods
-     * ('.').</li>
-     * <li>Must be between 3 and 255 characters.</li>
-     * <li>Must begin with a letter.</li>
-     * <li>Must end with a letter or a number.</li>
-     * <li>Cannot begin with 'goog' prefix.</li>
-     * </ul>
-     */
-    public static PubsubTopic fromPath(String path) {
-      if (path.equals(TOPIC_DEV_NULL_TEST_NAME)) {
-        return new PubsubTopic(Type.FAKE, "", path);
-      }
-
-      String projectName, topicName;
-
-      Matcher v1beta1Match = V1BETA1_TOPIC_REGEXP.matcher(path);
-      if (v1beta1Match.matches()) {
-        LOG.warn("Saw topic in v1beta1 format.  Topics should be in the format "
-            + "projects/<project_id>/topics/<topic_name>");
-        projectName = v1beta1Match.group(1);
-        topicName = v1beta1Match.group(2);
-      } else {
-        Matcher match = TOPIC_REGEXP.matcher(path);
-        if (!match.matches()) {
-          throw new IllegalArgumentException(
-              "Pubsub topic is not in projects/<project_id>/topics/<topic_name> format: " + path);
-        }
-        projectName = match.group(1);
-        topicName = match.group(2);
-      }
-
-      validateProjectName(projectName);
-      validatePubsubName(topicName);
-      return new PubsubTopic(Type.NORMAL, projectName, topicName);
-    }
-
-    /**
-     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
-     * v1beta1 API.
-     *
-     * @deprecated the v1beta1 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta1Path() {
-      if (type == Type.NORMAL) {
-        return "/topics/" + project + "/" + topic;
-      } else {
-        return topic;
-      }
-    }
-
-    /**
-     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
-     * v1beta2 API.
-     *
-     * @deprecated the v1beta2 API for Cloud Pub/Sub is deprecated.
-     */
-    @Deprecated
-    public String asV1Beta2Path() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/topics/" + topic;
-      } else {
-        return topic;
-      }
-    }
-
-    /**
-     * Returns the string representation of this topic as a path used in the Cloud Pub/Sub
-     * API.
-     */
-    public String asPath() {
-      if (type == Type.NORMAL) {
-        return "projects/" + project + "/topics/" + topic;
-      } else {
-        return topic;
-      }
-    }
-  }
-
-  /**
-   * A {@link PTransform} that continuously reads from a Cloud Pub/Sub stream and
-   * returns a {@link PCollection} of {@link String Strings} containing the items from
-   * the stream.
-   *
-   * <p>When running with a {@link PipelineRunner} that only supports bounded
-   * {@link PCollection PCollections} (such as {@link DirectPipelineRunner} or
-   * {@link DataflowPipelineRunner} without {@code --streaming}), only a bounded portion of the
-   * input Pub/Sub stream can be processed. As such, either {@link Bound#maxNumRecords(int)} or
-   * {@link Bound#maxReadTime(Duration)} must be set.
-   */
-  public static class Read {
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with the specified transform
-     * name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).named(name);
-    }
-
-    /**
-     * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive
-     * with {@link #subscription(String)}.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format
-     * of the {@code topic} string.
-     *
-     * <p>Dataflow will start reading data published on this topic from the time the pipeline is
-     * started. Any data published on the topic before the pipeline is started will not be read by
-     * Dataflow.
-     */
-    public static Bound<String> topic(String topic) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
-    }
-
-    /**
-     * Creates and returns a transform for reading from a specific Cloud Pub/Sub subscription.
-     * Mutually exclusive with {@link #topic(String)}.
-     *
-     * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
-     * of the {@code subscription} string.
-     */
-    public static Bound<String> subscription(String subscription) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).subscription(subscription);
-    }
-
-    /**
-     * Creates and returns a transform reading from Cloud Pub/Sub where record timestamps are
-     * expected to be provided as Pub/Sub message attributes. The {@code timestampLabel}
-     * parameter specifies the name of the attribute that contains the timestamp.
-     *
-     * <p>The timestamp value is expected to be represented in the attribute as either:
-     *
-     * <ul>
-     * <li>a numerical value representing the number of milliseconds since the Unix epoch. For
-     * example, if using the Joda time classes, {@link Instant#getMillis()} returns the correct
-     * value for this attribute.
-     * <li>a String in RFC 3339 format. For example, {@code 2015-10-29T23:41:41.123Z}. The
-     * sub-second component of the timestamp is optional, and digits beyond the first three
-     * (i.e., time units smaller than milliseconds) will be ignored.
-     * </ul>
-     *
-     * <p>If {@code timestampLabel} is not provided, the system will generate record timestamps
-     * the first time it sees each record. All windowing will be done relative to these timestamps.
-     *
-     * <p>By default, windows are emitted based on an estimate of when this source is likely
-     * done producing data for a given timestamp (referred to as the Watermark; see
-     * {@link AfterWatermark} for more details). Any late data will be handled by the trigger
-     * specified with the windowing strategy &ndash; by default it will be output immediately.
-     *
-     * <p>Note that the system can guarantee that no late data will ever be seen when it assigns
-     * timestamps by arrival time (i.e. {@code timestampLabel} is not provided).
-     *
-     * @see <a href="https://www.ietf.org/rfc/rfc3339.txt">RFC 3339</a>
-     */
-    public static Bound<String> timestampLabel(String timestampLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub where unique record
-     * identifiers are expected to be provided as Pub/Sub message attributes. The {@code idLabel}
-     * parameter specifies the attribute name. The value of the attribute can be any string
-     * that uniquely identifies this record.
-     *
-     * <p>If {@code idLabel} is not provided, Dataflow cannot guarantee that no duplicate data will
-     * be delivered on the Pub/Sub stream. In this case, deduplication of the stream will be
-     * strictly best effort.
-     */
-    public static Bound<String> idLabel(String idLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub that uses the given
-     * {@link Coder} to decode Pub/Sub messages into a value of type {@code T}.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which just
-     * returns the text lines as Java strings.
-     *
-     * @param <T> the type of the decoded elements, and the elements
-     * of the resulting PCollection.
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
-     * records that will be read. The transform produces a <i>bounded</i> {@link PCollection}.
-     *
-     * <p>Either this option or {@link #maxReadTime(Duration)} must be set in order to create a
-     * bounded source.
-     */
-    public static Bound<String> maxNumRecords(int maxNumRecords) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).maxNumRecords(maxNumRecords);
-    }
-
-    /**
-     * Creates and returns a transform for reading from Cloud Pub/Sub with a maximum number of
-     * duration during which records will be read.  The transform produces a <i>bounded</i>
-     * {@link PCollection}.
-     *
-     * <p>Either this option or {@link #maxNumRecords(int)} must be set in order to create a bounded
-     * source.
-     */
-    public static Bound<String> maxReadTime(Duration maxReadTime) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).maxReadTime(maxReadTime);
-    }
-
-    /**
-     * A {@link PTransform} that reads from a Cloud Pub/Sub source and returns
-     * a unbounded {@link PCollection} containing the items from the stream.
-     */
-    public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
-      /** The Cloud Pub/Sub topic to read from. */
-      @Nullable private final PubsubTopic topic;
-
-      /** The Cloud Pub/Sub subscription to read from. */
-      @Nullable private final PubsubSubscription subscription;
-
-      /** The name of the message attribute to read timestamps from. */
-      @Nullable private final String timestampLabel;
-
-      /** The name of the message attribute to read unique message IDs from. */
-      @Nullable private final String idLabel;
-
-      /** The coder used to decode each record. */
-      @Nullable private final Coder<T> coder;
-
-      /** Stop after reading this many records. */
-      private final int maxNumRecords;
-
-      /** Stop after reading for this much time. */
-      @Nullable private final Duration maxReadTime;
-
-      private Bound(Coder<T> coder) {
-        this(null, null, null, null, coder, null, 0, null);
-      }
-
-      private Bound(String name, PubsubSubscription subscription, PubsubTopic topic,
-          String timestampLabel, Coder<T> coder, String idLabel, int maxNumRecords,
-          Duration maxReadTime) {
-        super(name);
-        this.subscription = subscription;
-        this.topic = topic;
-        this.timestampLabel = timestampLabel;
-        this.coder = coder;
-        this.idLabel = idLabel;
-        this.maxNumRecords = maxNumRecords;
-        this.maxReadTime = maxReadTime;
-      }
-
-      /**
-       * Returns a transform that's like this one but with the given step name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but reading from the
-       * given subscription.
-       *
-       * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format
-       * of the {@code subscription} string.
-       *
-       * <p>Multiple readers reading from the same subscription will each receive
-       * some arbitrary portion of the data.  Most likely, separate readers should
-       * use their own subscriptions.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> subscription(String subscription) {
-        return new Bound<>(name, PubsubSubscription.fromPath(subscription), topic, timestampLabel,
-            coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but that reads from the specified topic.
-       *
-       * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the
-       * format of the {@code topic} string.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> topic(String topic) {
-        return new Bound<>(name, subscription, PubsubTopic.fromPath(topic), timestampLabel, coder,
-            idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but that reads message timestamps
-       * from the given message attribute. See {@link PubsubIO.Read#timestampLabel(String)} for
-       * more details on the format of the timestamp attribute.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> timestampLabel(String timestampLabel) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but that reads unique message IDs
-       * from the given message attribute. See {@link PubsubIO.Read#idLabel(String)} for more
-       * details on the format of the ID attribute.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> idLabel(String idLabel) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but that uses the given
-       * {@link Coder} to decode each record into a value of type {@code X}.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the decoded elements, and the
-       * elements of the resulting PCollection.
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but will only read up to the specified
-       * maximum number of records from Cloud Pub/Sub. The transform produces a <i>bounded</i>
-       * {@link PCollection}. See {@link PubsubIO.Read#maxNumRecords(int)} for more details.
-       */
-      public Bound<T> maxNumRecords(int maxNumRecords) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      /**
-       * Returns a transform that's like this one but will only read during the specified
-       * duration from Cloud Pub/Sub. The transform produces a <i>bounded</i> {@link PCollection}.
-       * See {@link PubsubIO.Read#maxReadTime(Duration)} for more details.
-       */
-      public Bound<T> maxReadTime(Duration maxReadTime) {
-        return new Bound<>(
-            name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime);
-      }
-
-      @Override
-      public PCollection<T> apply(PInput input) {
-        if (topic == null && subscription == null) {
-          throw new IllegalStateException("need to set either the topic or the subscription for "
-              + "a PubsubIO.Read transform");
-        }
-        if (topic != null && subscription != null) {
-          throw new IllegalStateException("Can't set both the topic and the subscription for a "
-              + "PubsubIO.Read transform");
-        }
-
-        boolean boundedOutput = getMaxNumRecords() > 0 || getMaxReadTime() != null;
-
-        if (boundedOutput) {
-          return input.getPipeline().begin()
-              .apply(Create.of((Void) null)).setCoder(VoidCoder.of())
-              .apply(ParDo.of(new PubsubReader())).setCoder(coder);
-        } else {
-          return PCollection.<T>createPrimitiveOutputInternal(
-                  input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
-              .setCoder(coder);
-        }
-      }
-
-      @Override
-      protected Coder<T> getDefaultOutputCoder() {
-        return coder;
-      }
-
-      public PubsubTopic getTopic() {
-        return topic;
-      }
-
-      public PubsubSubscription getSubscription() {
-        return subscription;
-      }
-
-      public String getTimestampLabel() {
-        return timestampLabel;
-      }
-
-      public Coder<T> getCoder() {
-        return coder;
-      }
-
-      public String getIdLabel() {
-        return idLabel;
-      }
-
-      public int getMaxNumRecords() {
-        return maxNumRecords;
-      }
-
-      public Duration getMaxReadTime() {
-        return maxReadTime;
-      }
-
-      private class PubsubReader extends DoFn<Void, T> {
-        private static final int DEFAULT_PULL_SIZE = 100;
-
-        @Override
-        public void processElement(ProcessContext c) throws IOException {
-          Pubsub pubsubClient =
-              Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class))
-                  .build();
-
-          String subscription;
-          if (getSubscription() == null) {
-            String topic = getTopic().asPath();
-            String[] split = topic.split("/");
-            subscription =
-                "projects/" + split[1] + "/subscriptions/" + split[3] + "_dataflow_"
-                + new Random().nextLong();
-            Subscription subInfo = new Subscription().setAckDeadlineSeconds(60).setTopic(topic);
-            try {
-              pubsubClient.projects().subscriptions().create(subscription, subInfo).execute();
-            } catch (Exception e) {
-              throw new RuntimeException("Failed to create subscription: ", e);
-            }
-          } else {
-            subscription = getSubscription().asPath();
-          }
-
-          Instant endTime = (getMaxReadTime() == null)
-              ? new Instant(Long.MAX_VALUE) : Instant.now().plus(getMaxReadTime());
-
-          List<PubsubMessage> messages = new ArrayList<>();
-
-          Throwable finallyBlockException = null;
-          try {
-            while ((getMaxNumRecords() == 0 || messages.size() < getMaxNumRecords())
-                && Instant.now().isBefore(endTime)) {
-              PullRequest pullRequest = new PullRequest().setReturnImmediately(false);
-              if (getMaxNumRecords() > 0) {
-                pullRequest.setMaxMessages(getMaxNumRecords() - messages.size());
-              } else {
-                pullRequest.setMaxMessages(DEFAULT_PULL_SIZE);
-              }
-
-              PullResponse pullResponse =
-                  pubsubClient.projects().subscriptions().pull(subscription, pullRequest).execute();
-              List<String> ackIds = new ArrayList<>();
-              if (pullResponse.getReceivedMessages() != null) {
-                for (ReceivedMessage received : pullResponse.getReceivedMessages()) {
-                  messages.add(received.getMessage());
-                  ackIds.add(received.getAckId());
-                }
-              }
-
-              if (ackIds.size() != 0) {
-                AcknowledgeRequest ackRequest = new AcknowledgeRequest().setAckIds(ackIds);
-                pubsubClient.projects()
-                    .subscriptions()
-                    .acknowledge(subscription, ackRequest)
-                    .execute();
-              }
-            }
-          } catch (IOException e) {
-            throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
-          } finally {
-            if (getTopic() != null) {
-              try {
-                pubsubClient.projects().subscriptions().delete(subscription).execute();
-              } catch (IOException e) {
-                finallyBlockException = new RuntimeException("Failed to delete subscription: ", e);
-                LOG.error("Failed to delete subscription: ", e);
-              }
-            }
-          }
-          if (finallyBlockException != null) {
-            Throwables.propagate(finallyBlockException);
-          }
-
-          for (PubsubMessage message : messages) {
-            c.outputWithTimestamp(
-                CoderUtils.decodeFromByteArray(getCoder(), message.decodeData()),
-                assignMessageTimestamp(message, getTimestampLabel(), Clock.SYSTEM));
-          }
-        }
-      }
-    }
-
-    /** Disallow construction of utility class. */
-    private Read() {}
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /** Disallow construction of utility class. */
-  private PubsubIO() {}
-
-  /**
-   * A {@link PTransform} that continuously writes a
-   * {@link PCollection} of {@link String Strings} to a Cloud Pub/Sub stream.
-   */
-  // TODO: Support non-String encodings.
-  public static class Write {
-    /**
-     * Creates a transform that writes to Pub/Sub with the given step name.
-     */
-    public static Bound<String> named(String name) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).named(name);
-    }
-
-    /**
-     * Creates a transform that publishes to the specified topic.
-     *
-     * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
-     * {@code topic} string.
-     */
-    public static Bound<String> topic(String topic) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).topic(topic);
-    }
-
-    /**
-     * Creates a transform that writes to Pub/Sub, adds each record's timestamp to the published
-     * messages in an attribute with the specified name. The value of the attribute will be a number
-     * representing the number of milliseconds since the Unix epoch. For example, if using the Joda
-     * time classes, {@link Instant#Instant(long)} can be used to parse this value.
-     *
-     * <p>If the output from this sink is being read by another Dataflow source, then
-     * {@link PubsubIO.Read#timestampLabel(String)} can be used to ensure the other source reads
-     * these timestamps from the appropriate attribute.
-     */
-    public static Bound<String> timestampLabel(String timestampLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).timestampLabel(timestampLabel);
-    }
-
-    /**
-     * Creates a transform that writes to Pub/Sub, adding each record's unique identifier to the
-     * published messages in an attribute with the specified name. The value of the attribute is an
-     * opaque string.
-     *
-     * <p>If the the output from this sink is being read by another Dataflow source, then
-     * {@link PubsubIO.Read#idLabel(String)} can be used to ensure that* the other source reads
-     * these unique identifiers from the appropriate attribute.
-     */
-    public static Bound<String> idLabel(String idLabel) {
-      return new Bound<>(DEFAULT_PUBSUB_CODER).idLabel(idLabel);
-    }
-
-    /**
-     * Creates a transform that  uses the given {@link Coder} to encode each of the
-     * elements of the input collection into an output message.
-     *
-     * <p>By default, uses {@link StringUtf8Coder}, which writes input Java strings directly as
-     * records.
-     *
-     * @param <T> the type of the elements of the input PCollection
-     */
-    public static <T> Bound<T> withCoder(Coder<T> coder) {
-      return new Bound<>(coder);
-    }
-
-    /**
-     * A {@link PTransform} that writes an unbounded {@link PCollection} of {@link String Strings}
-     * to a Cloud Pub/Sub stream.
-     */
-    public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-      /** The Cloud Pub/Sub topic to publish to. */
-      @Nullable private final PubsubTopic topic;
-      /** The name of the message attribute to publish message timestamps in. */
-      @Nullable private final String timestampLabel;
-      /** The name of the message attribute to publish unique message IDs in. */
-      @Nullable private final String idLabel;
-      private final Coder<T> coder;
-
-      private Bound(Coder<T> coder) {
-        this(null, null, null, null, coder);
-      }
-
-      private Bound(
-          String name, PubsubTopic topic, String timestampLabel, String idLabel, Coder<T> coder) {
-        super(name);
-        this.topic = topic;
-        this.timestampLabel = timestampLabel;
-        this.idLabel = idLabel;
-        this.coder = coder;
-      }
-
-      /**
-       * Returns a new transform that's like this one but with the specified step
-       * name.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> named(String name) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
-      }
-
-      /**
-       * Returns a new transform that's like this one but that writes to the specified
-       * topic.
-       *
-       * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
-       * {@code topic} string.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> topic(String topic) {
-        return new Bound<>(name, PubsubTopic.fromPath(topic), timestampLabel, idLabel, coder);
-      }
-
-      /**
-       * Returns a new transform that's like this one but that publishes record timestamps
-       * to a message attribute with the specified name. See
-       * {@link PubsubIO.Write#timestampLabel(String)} for more details.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> timestampLabel(String timestampLabel) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
-      }
-
-      /**
-       * Returns a new transform that's like this one but that publishes unique record IDs
-       * to a message attribute with the specified name. See {@link PubsubIO.Write#idLabel(String)}
-       * for more details.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound<T> idLabel(String idLabel) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
-      }
-
-      /**
-       * Returns a new transform that's like this one
-       * but that uses the given {@link Coder} to encode each of
-       * the elements of the input {@link PCollection} into an
-       * output record.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param <X> the type of the elements of the input {@link PCollection}
-       */
-      public <X> Bound<X> withCoder(Coder<X> coder) {
-        return new Bound<>(name, topic, timestampLabel, idLabel, coder);
-      }
-
-      @Override
-      public PDone apply(PCollection<T> input) {
-        if (topic == null) {
-          throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
-        }
-        input.apply(ParDo.of(new PubsubWriter()));
-        return PDone.in(input.getPipeline());
-      }
-
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
-
-      public PubsubTopic getTopic() {
-        return topic;
-      }
-
-      public String getTimestampLabel() {
-        return timestampLabel;
-      }
-
-      public String getIdLabel() {
-        return idLabel;
-      }
-
-      public Coder<T> getCoder() {
-        return coder;
-      }
-
-      private class PubsubWriter extends DoFn<T, Void> {
-        private static final int MAX_PUBLISH_BATCH_SIZE = 100;
-        private transient List<PubsubMessage> output;
-        private transient Pubsub pubsubClient;
-
-        @Override
-        public void startBundle(Context c) {
-          this.output = new ArrayList<>();
-          this.pubsubClient =
-              Transport.newPubsubClient(c.getPipelineOptions().as(DataflowPipelineOptions.class))
-                  .build();
-        }
-
-        @Override
-        public void processElement(ProcessContext c) throws IOException {
-          PubsubMessage message =
-              new PubsubMessage().encodeData(CoderUtils.encodeToByteArray(getCoder(), c.element()));
-          if (getTimestampLabel() != null) {
-            Map<String, String> attributes = message.getAttributes();
-            if (attributes == null) {
-              attributes = new HashMap<>();
-              message.setAttributes(attributes);
-            }
-            attributes.put(getTimestampLabel(), String.valueOf(c.timestamp().getMillis()));
-          }
-          output.add(message);
-
-          if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
-            publish();
-          }
-        }
-
-        @Override
-        public void finishBundle(Context c) throws IOException {
-          if (!output.isEmpty()) {
-            publish();
-          }
-        }
-
-        private void publish() throws IOException {
-          PublishRequest publishRequest = new PublishRequest().setMessages(output);
-          pubsubClient.projects().topics()
-              .publish(getTopic().asPath(), publishRequest)
-              .execute();
-          output.clear();
-        }
-      }
-    }
-
-    /** Disallow construction of utility class. */
-    private Write() {}
-  }
-}


Mime
View raw message