beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [43/67] [partial] incubator-beam git commit: Directory reorganization
Date Thu, 24 Mar 2016 02:48:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java
deleted file mode 100644
index f618bc9..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java
+++ /dev/null
@@ -1,957 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import static com.google.api.services.datastore.DatastoreV1.PropertyFilter.Operator.EQUAL;
-import static com.google.api.services.datastore.DatastoreV1.PropertyOrder.Direction.DESCENDING;
-import static com.google.api.services.datastore.DatastoreV1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeOrder;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Verify.verify;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.datastore.DatastoreV1.CommitRequest;
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.EntityResult;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Key.PathElement;
-import com.google.api.services.datastore.DatastoreV1.PartitionId;
-import com.google.api.services.datastore.DatastoreV1.Query;
-import com.google.api.services.datastore.DatastoreV1.QueryResultBatch;
-import com.google.api.services.datastore.DatastoreV1.RunQueryRequest;
-import com.google.api.services.datastore.DatastoreV1.RunQueryResponse;
-import com.google.api.services.datastore.client.Datastore;
-import com.google.api.services.datastore.client.DatastoreException;
-import com.google.api.services.datastore.client.DatastoreFactory;
-import com.google.api.services.datastore.client.DatastoreHelper;
-import com.google.api.services.datastore.client.DatastoreOptions;
-import com.google.api.services.datastore.client.QuerySplitter;
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.EntityCoder;
-import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
-import com.google.cloud.dataflow.sdk.io.Sink.WriteOperation;
-import com.google.cloud.dataflow.sdk.io.Sink.Writer;
-import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
-import com.google.cloud.dataflow.sdk.options.GcpOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
-import com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.primitives.Ints;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import javax.annotation.Nullable;
-
-/**
- * <p>{@link DatastoreIO} provides an API to Read and Write {@link PCollection PCollections} of
- * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a>
- * {@link Entity} objects.
- *
- * <p>Google Cloud Datastore is a fully managed NoSQL data storage service.
- * An {@code Entity} is an object in Datastore, analogous to a row in traditional
- * database table.
- *
- * <p>This API currently requires an authentication workaround. To use {@link DatastoreIO}, users
- * must use the {@code gcloud} command line tool to get credentials for Datastore:
- * <pre>
- * $ gcloud auth login
- * </pre>
- *
- * <p>To read a {@link PCollection} from a query to Datastore, use {@link DatastoreIO#source} and
- * its methods {@link DatastoreIO.Source#withDataset} and {@link DatastoreIO.Source#withQuery} to
- * specify the dataset to query and the query to read from. You can optionally provide a namespace
- * to query within using {@link DatastoreIO.Source#withNamespace} or a Datastore host using
- * {@link DatastoreIO.Source#withHost}.
- *
- * <p>For example:
- *
- * <pre> {@code
- * // Read a query from Datastore
- * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
- * Query query = ...;
- * String dataset = "...";
- *
- * Pipeline p = Pipeline.create(options);
- * PCollection<Entity> entities = p.apply(
- *     Read.from(DatastoreIO.source()
- *         .withDataset(datasetId)
- *         .withQuery(query)
- *         .withHost(host)));
- * } </pre>
- *
- * <p>or:
- *
- * <pre> {@code
- * // Read a query from Datastore using the default namespace and host
- * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
- * Query query = ...;
- * String dataset = "...";
- *
- * Pipeline p = Pipeline.create(options);
- * PCollection<Entity> entities = p.apply(DatastoreIO.readFrom(datasetId, query));
- * p.run();
- * } </pre>
- *
- * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
- * many workers. However, when the {@link Query} is configured with a limit using
- * {@link com.google.api.services.datastore.DatastoreV1.Query.Builder#setLimit(int)}, then
- * all returned results will be read by a single Dataflow worker in order to ensure correct data.
- *
- * <p>To write a {@link PCollection} to a Datastore, use {@link DatastoreIO#writeTo},
- * specifying the datastore to write to:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.writeTo(dataset));
- * p.run();
- * } </pre>
- *
- * <p>To optionally change the host that is used to write to the Datastore, use {@link
- * DatastoreIO#sink} to build a {@link DatastoreIO.Sink} and write to it using the {@link Write}
- * transform:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(Write.to(DatastoreIO.sink().withDataset(dataset).withHost(host)));
- * } </pre>
- *
- * <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete
- * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
- * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than the
- * project default may be written to by specifying it in the {@code Entity} {@code Keys}.
- *
- * <pre>{@code
- * Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
- * keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
- * }</pre>
- *
- * <p>{@code Entities} will be committed as upsert (update or insert) mutations. Please read
- * <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties, and
- * Keys</a> for more information about {@code Entity} keys.
- *
- * <p><h3>Permissions</h3>
- * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for
- * more details.
- *
- * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up
- * </a>for security and permission related information specific to Datastore.
- *
- * @see com.google.cloud.dataflow.sdk.runners.PipelineRunner
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class DatastoreIO {
-  public static final String DEFAULT_HOST = "https://www.googleapis.com";
-
-  /**
-   * Datastore has a limit of 500 mutations per batch operation, so we flush
-   * changes to Datastore every 500 entities.
-   */
-  public static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
-
-  /**
-   * Returns an empty {@link DatastoreIO.Source} builder with the default {@code host}.
-   * Configure the {@code dataset}, {@code query}, and {@code namespace} using
-   * {@link DatastoreIO.Source#withDataset}, {@link DatastoreIO.Source#withQuery},
-   * and {@link DatastoreIO.Source#withNamespace}.
-   *
-   * @deprecated the name and return type do not match. Use {@link #source()}.
-   */
-  @Deprecated
-  public static Source read() {
-    return source();
-  }
-
-  /**
-   * Returns an empty {@link DatastoreIO.Source} builder with the default {@code host}.
-   * Configure the {@code dataset}, {@code query}, and {@code namespace} using
-   * {@link DatastoreIO.Source#withDataset}, {@link DatastoreIO.Source#withQuery},
-   * and {@link DatastoreIO.Source#withNamespace}.
-   *
-   * <p>The resulting {@link Source} object can be passed to {@link Read} to create a
-   * {@code PTransform} that will read from Datastore.
-   */
-  public static Source source() {
-    return new Source(DEFAULT_HOST, null, null, null);
-  }
-
-  /**
-   * Returns a {@code PTransform} that reads Datastore entities from the query
-   * against the given dataset.
-   */
-  public static Read.Bounded<Entity> readFrom(String datasetId, Query query) {
-    return Read.from(new Source(DEFAULT_HOST, datasetId, query, null));
-  }
-
-  /**
-   * Returns a {@code PTransform} that reads Datastore entities from the query
-   * against the given dataset and host.
-   *
-   * @deprecated prefer {@link #source()} with {@link Source#withHost}, {@link Source#withDataset},
-   *    {@link Source#withQuery}s.
-   */
-  @Deprecated
-  public static Read.Bounded<Entity> readFrom(String host, String datasetId, Query query) {
-    return Read.from(new Source(host, datasetId, query, null));
-  }
-
-  /**
-   * A {@link Source} that reads the result rows of a Datastore query as {@code Entity} objects.
-   */
-  public static class Source extends BoundedSource<Entity> {
-    public String getHost() {
-      return host;
-    }
-
-    public String getDataset() {
-      return datasetId;
-    }
-
-    public Query getQuery() {
-      return query;
-    }
-
-    @Nullable
-    public String getNamespace() {
-      return namespace;
-    }
-
-    public Source withDataset(String datasetId) {
-      checkNotNull(datasetId, "datasetId");
-      return new Source(host, datasetId, query, namespace);
-    }
-
-    /**
-     * Returns a new {@link Source} that reads the results of the specified query.
-     *
-     * <p>Does not modify this object.
-     *
-     * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel
-     * across many workers. However, when the {@link Query} is configured with a limit using
-     * {@link com.google.api.services.datastore.DatastoreV1.Query.Builder#setLimit(int)}, then all
-     * returned results will be read by a single Dataflow worker in order to ensure correct data.
-     */
-    public Source withQuery(Query query) {
-      checkNotNull(query, "query");
-      checkArgument(!query.hasLimit() || query.getLimit() > 0,
-          "Invalid query limit %s: must be positive", query.getLimit());
-      return new Source(host, datasetId, query, namespace);
-    }
-
-    public Source withHost(String host) {
-      checkNotNull(host, "host");
-      return new Source(host, datasetId, query, namespace);
-    }
-
-    public Source withNamespace(@Nullable String namespace) {
-      return new Source(host, datasetId, query, namespace);
-    }
-
-    @Override
-    public Coder<Entity> getDefaultOutputCoder() {
-      return EntityCoder.of();
-    }
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) {
-      // TODO: Perhaps this can be implemented by inspecting the query.
-      return false;
-    }
-
-    @Override
-    public List<Source> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
-        throws Exception {
-      // Users may request a limit on the number of results. We can currently support this by
-      // simply disabling parallel reads and using only a single split.
-      if (query.hasLimit()) {
-        return ImmutableList.of(this);
-      }
-
-      long numSplits;
-      try {
-        numSplits = Math.round(((double) getEstimatedSizeBytes(options)) / desiredBundleSizeBytes);
-      } catch (Exception e) {
-        // Fallback in case estimated size is unavailable. TODO: fix this, it's horrible.
-
-        // 1. Try Dataflow's numWorkers, which will be 0 for other workers.
-        DataflowPipelineWorkerPoolOptions poolOptions =
-            options.as(DataflowPipelineWorkerPoolOptions.class);
-        if (poolOptions.getNumWorkers() > 0) {
-          LOG.warn("Estimated size of unavailable, using the number of workers {}",
-              poolOptions.getNumWorkers(), e);
-          numSplits = poolOptions.getNumWorkers();
-        } else {
-          // 2. Default to 12 in the unknown case.
-          numSplits = 12;
-        }
-      }
-
-      // If the desiredBundleSize or number of workers results in 1 split, simply return
-      // a source that reads from the original query.
-      if (numSplits <= 1) {
-        return ImmutableList.of(this);
-      }
-
-      List<Query> datastoreSplits;
-      try {
-        datastoreSplits = getSplitQueries(Ints.checkedCast(numSplits), options);
-      } catch (IllegalArgumentException | DatastoreException e) {
-        LOG.warn("Unable to parallelize the given query: {}", query, e);
-        return ImmutableList.of(this);
-      }
-
-      ImmutableList.Builder<Source> splits = ImmutableList.builder();
-      for (Query splitQuery : datastoreSplits) {
-        splits.add(new Source(host, datasetId, splitQuery, namespace));
-      }
-      return splits.build();
-    }
-
-    @Override
-    public BoundedReader<Entity> createReader(PipelineOptions pipelineOptions) throws IOException {
-      return new DatastoreReader(this, getDatastore(pipelineOptions));
-    }
-
-    @Override
-    public void validate() {
-      Preconditions.checkNotNull(host, "host");
-      Preconditions.checkNotNull(query, "query");
-      Preconditions.checkNotNull(datasetId, "datasetId");
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      // Datastore provides no way to get a good estimate of how large the result of a query
-      // will be. As a rough approximation, we attempt to fetch the statistics of the whole
-      // entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind
-      // is specified in the query.
-      //
-      // See https://cloud.google.com/datastore/docs/concepts/stats
-      if (mockEstimateSizeBytes != null) {
-        return mockEstimateSizeBytes;
-      }
-
-      Datastore datastore = getDatastore(options);
-      if (query.getKindCount() != 1) {
-        throw new UnsupportedOperationException(
-            "Can only estimate size for queries specifying exactly 1 kind.");
-      }
-      String ourKind = query.getKind(0).getName();
-      long latestTimestamp = queryLatestStatisticsTimestamp(datastore);
-      Query.Builder query = Query.newBuilder();
-      if (namespace == null) {
-        query.addKindBuilder().setName("__Stat_Kind__");
-      } else {
-        query.addKindBuilder().setName("__Ns_Stat_Kind__");
-      }
-      query.setFilter(makeFilter(
-          makeFilter("kind_name", EQUAL, makeValue(ourKind)).build(),
-          makeFilter("timestamp", EQUAL, makeValue(latestTimestamp)).build()));
-      RunQueryRequest request = makeRequest(query.build());
-
-      long now = System.currentTimeMillis();
-      RunQueryResponse response = datastore.runQuery(request);
-      LOG.info("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
-
-      QueryResultBatch batch = response.getBatch();
-      if (batch.getEntityResultCount() == 0) {
-        throw new NoSuchElementException(
-            "Datastore statistics for kind " + ourKind + " unavailable");
-      }
-      Entity entity = batch.getEntityResult(0).getEntity();
-      return getPropertyMap(entity).get("entity_bytes").getIntegerValue();
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("host", host)
-          .add("dataset", datasetId)
-          .add("query", query)
-          .add("namespace", namespace)
-          .toString();
-    }
-
-    ///////////////////////////////////////////////////////////////////////////////////////////
-
-    private static final Logger LOG = LoggerFactory.getLogger(Source.class);
-    private final String host;
-    /** Not really nullable, but it may be {@code null} for in-progress {@code Source}s. */
-    @Nullable
-    private final String datasetId;
-    /** Not really nullable, but it may be {@code null} for in-progress {@code Source}s. */
-    @Nullable
-    private final Query query;
-    @Nullable
-    private final String namespace;
-
-    /** For testing only. TODO: This could be much cleaner with dependency injection. */
-    @Nullable
-    private QuerySplitter mockSplitter;
-    @Nullable
-    private Long mockEstimateSizeBytes;
-
-    /**
-     * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be
-     * {@code null} as a matter of build order, but if they are {@code null} at instantiation time,
-     * an error will be thrown.
-     */
-    private Source(
-        String host, @Nullable String datasetId, @Nullable Query query,
-        @Nullable String namespace) {
-      this.host = checkNotNull(host, "host");
-      this.datasetId = datasetId;
-      this.query = query;
-      this.namespace = namespace;
-    }
-
-    /**
-     * A helper function to get the split queries, taking into account the optional
-     * {@code namespace} and whether there is a mock splitter.
-     */
-    private List<Query> getSplitQueries(int numSplits, PipelineOptions options)
-        throws DatastoreException {
-      // If namespace is set, include it in the split request so splits are calculated accordingly.
-      PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
-      if (namespace != null) {
-        partitionBuilder.setNamespace(namespace);
-      }
-
-      if (mockSplitter != null) {
-        // For testing.
-        return mockSplitter.getSplits(query, partitionBuilder.build(), numSplits, null);
-      }
-
-      return DatastoreHelper.getQuerySplitter().getSplits(
-          query, partitionBuilder.build(), numSplits, getDatastore(options));
-    }
-
-    /**
-     * Builds a {@link RunQueryRequest} from the {@code query}, using the properties set on this
-     * {@code Source}. For example, sets the {@code namespace} for the request.
-     */
-    private RunQueryRequest makeRequest(Query query) {
-      RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
-      if (namespace != null) {
-        requestBuilder.getPartitionIdBuilder().setNamespace(namespace);
-      }
-      return requestBuilder.build();
-    }
-
-    /**
-     * Datastore system tables with statistics are periodically updated. This method fetches
-     * the latest timestamp of statistics update using the {@code __Stat_Total__} table.
-     */
-    private long queryLatestStatisticsTimestamp(Datastore datastore) throws DatastoreException {
-      Query.Builder query = Query.newBuilder();
-      query.addKindBuilder().setName("__Stat_Total__");
-      query.addOrder(makeOrder("timestamp", DESCENDING));
-      query.setLimit(1);
-      RunQueryRequest request = makeRequest(query.build());
-
-      long now = System.currentTimeMillis();
-      RunQueryResponse response = datastore.runQuery(request);
-      LOG.info("Query for latest stats timestamp of dataset {} took {}ms", datasetId,
-          System.currentTimeMillis() - now);
-      QueryResultBatch batch = response.getBatch();
-      if (batch.getEntityResultCount() == 0) {
-        throw new NoSuchElementException(
-            "Datastore total statistics for dataset " + datasetId + " unavailable");
-      }
-      Entity entity = batch.getEntityResult(0).getEntity();
-      return getPropertyMap(entity).get("timestamp").getTimestampMicrosecondsValue();
-    }
-
-    private Datastore getDatastore(PipelineOptions pipelineOptions) {
-      DatastoreOptions.Builder builder =
-          new DatastoreOptions.Builder().host(host).dataset(datasetId).initializer(
-              new RetryHttpRequestInitializer());
-
-      Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
-      if (credential != null) {
-        builder.credential(credential);
-      }
-      return DatastoreFactory.get().create(builder.build());
-    }
-
-    /** For testing only. */
-    Source withMockSplitter(QuerySplitter splitter) {
-      Source res = new Source(host, datasetId, query, namespace);
-      res.mockSplitter = splitter;
-      res.mockEstimateSizeBytes = mockEstimateSizeBytes;
-      return res;
-    }
-
-    /** For testing only. */
-    Source withMockEstimateSizeBytes(Long estimateSizeBytes) {
-      Source res = new Source(host, datasetId, query, namespace);
-      res.mockSplitter = mockSplitter;
-      res.mockEstimateSizeBytes = estimateSizeBytes;
-      return res;
-    }
-  }
-
-  ///////////////////// Write Class /////////////////////////////////
-
-  /**
-   * Returns a new {@link DatastoreIO.Sink} builder using the default host.
-   * You need to further configure it using {@link DatastoreIO.Sink#withDataset}, and optionally
-   * {@link DatastoreIO.Sink#withHost} before using it in a {@link Write} transform.
-   *
-   * <p>For example: {@code p.apply(Write.to(DatastoreIO.sink().withDataset(dataset)));}
-   */
-  public static Sink sink() {
-    return new Sink(DEFAULT_HOST, null);
-  }
-
-  /**
-   * Returns a new {@link Write} transform that will write to a {@link Sink}.
-   *
-   * <p>For example: {@code p.apply(DatastoreIO.writeTo(dataset));}
-   */
-  public static Write.Bound<Entity> writeTo(String datasetId) {
-    return Write.to(sink().withDataset(datasetId));
-  }
-
-  /**
-   * A {@link Sink} that writes a {@link PCollection} containing
-   * {@link Entity Entities} to a Datastore kind.
-   *
-   */
-  public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<Entity> {
-    final String host;
-    final String datasetId;
-
-    /**
-     * Returns a {@link Sink} that is like this one, but will write to the specified dataset.
-     */
-    public Sink withDataset(String datasetId) {
-      checkNotNull(datasetId, "datasetId");
-      return new Sink(host, datasetId);
-    }
-
-    /**
-     * Returns a {@link Sink} that is like this one, but will use the given host.  If not specified,
-     * the {@link DatastoreIO#DEFAULT_HOST default host} will be used.
-     */
-    public Sink withHost(String host) {
-      checkNotNull(host, "host");
-      return new Sink(host, datasetId);
-    }
-
-    /**
-     * Constructs a Sink with given host and dataset.
-     */
-    protected Sink(String host, String datasetId) {
-      this.host = checkNotNull(host, "host");
-      this.datasetId = datasetId;
-    }
-
-    /**
-     * Ensures the host and dataset are set.
-     */
-    @Override
-    public void validate(PipelineOptions options) {
-      Preconditions.checkNotNull(
-          host, "Host is a required parameter. Please use withHost to set the host.");
-      Preconditions.checkNotNull(
-          datasetId,
-          "Dataset ID is a required parameter. Please use withDataset to to set the datasetId.");
-    }
-
-    @Override
-    public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
-      return new DatastoreWriteOperation(this);
-    }
-  }
-
-  /**
-   * A {@link WriteOperation} that will manage a parallel write to a Datastore sink.
-   */
-  private static class DatastoreWriteOperation
-      extends WriteOperation<Entity, DatastoreWriteResult> {
-    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriteOperation.class);
-
-    private final DatastoreIO.Sink sink;
-
-    public DatastoreWriteOperation(DatastoreIO.Sink sink) {
-      this.sink = sink;
-    }
-
-    @Override
-    public Coder<DatastoreWriteResult> getWriterResultCoder() {
-      return SerializableCoder.of(DatastoreWriteResult.class);
-    }
-
-    @Override
-    public void initialize(PipelineOptions options) throws Exception {}
-
-    /**
-     * Finalizes the write.  Logs the number of entities written to the Datastore.
-     */
-    @Override
-    public void finalize(Iterable<DatastoreWriteResult> writerResults, PipelineOptions options)
-        throws Exception {
-      long totalEntities = 0;
-      for (DatastoreWriteResult result : writerResults) {
-        totalEntities += result.entitiesWritten;
-      }
-      LOG.info("Wrote {} elements.", totalEntities);
-    }
-
-    @Override
-    public DatastoreWriter createWriter(PipelineOptions options) throws Exception {
-      DatastoreOptions.Builder builder =
-          new DatastoreOptions.Builder()
-              .host(sink.host)
-              .dataset(sink.datasetId)
-              .initializer(new RetryHttpRequestInitializer());
-      Credential credential = options.as(GcpOptions.class).getGcpCredential();
-      if (credential != null) {
-        builder.credential(credential);
-      }
-      Datastore datastore = DatastoreFactory.get().create(builder.build());
-
-      return new DatastoreWriter(this, datastore);
-    }
-
-    @Override
-    public DatastoreIO.Sink getSink() {
-      return sink;
-    }
-  }
-
-  /**
-   * {@link Writer} that writes entities to a Datastore Sink.  Entities are written in batches,
-   * where the maximum batch size is {@link DatastoreIO#DATASTORE_BATCH_UPDATE_LIMIT}.  Entities
-   * are committed as upsert mutations (either update if the key already exists, or insert if it is
-   * a new key).  If an entity does not have a complete key (i.e., it has no name or id), the bundle
-   * will fail.
-   *
-   * <p>See <a
-   * href="https://cloud.google.com/datastore/docs/concepts/entities#Datastore_Creating_an_entity">
-   * Datastore: Entities, Properties, and Keys</a> for information about entity keys and upsert
-   * mutations.
-   *
-   * <p>Commits are non-transactional.  If a commit fails because of a conflict over an entity
-   * group, the commit will be retried (up to {@link DatastoreIO#DATASTORE_BATCH_UPDATE_LIMIT}
-   * times).
-   *
-   * <p>Visible for testing purposes.
-   */
-  static class DatastoreWriter extends Writer<Entity, DatastoreWriteResult> {
-    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriter.class);
-    private final DatastoreWriteOperation writeOp;
-    private final Datastore datastore;
-    private long totalWritten = 0;
-
-    // Visible for testing.
-    final List<Entity> entities = new ArrayList<>();
-
-    /**
-     * Since a bundle is written in batches, we should retry the commit of a batch in order to
-     * prevent transient errors from causing the bundle to fail.
-     */
-    private static final int MAX_RETRIES = 5;
-
-    /**
-     * Initial backoff time for exponential backoff for retry attempts.
-     */
-    private static final int INITIAL_BACKOFF_MILLIS = 5000;
-
-    /**
-     * Returns true if a Datastore key is complete.  A key is complete if its last element
-     * has either an id or a name.
-     */
-    static boolean isValidKey(Key key) {
-      List<PathElement> elementList = key.getPathElementList();
-      if (elementList.isEmpty()) {
-        return false;
-      }
-      PathElement lastElement = elementList.get(elementList.size() - 1);
-      return (lastElement.hasId() || lastElement.hasName());
-    }
-
-    // Visible for testing
-    DatastoreWriter(DatastoreWriteOperation writeOp, Datastore datastore) {
-      this.writeOp = writeOp;
-      this.datastore = datastore;
-    }
-
-    @Override
-    public void open(String uId) throws Exception {}
-
-    /**
-     * Writes an entity to the Datastore.  Writes are batched, up to {@link
-     * DatastoreIO#DATASTORE_BATCH_UPDATE_LIMIT}. If an entity does not have a complete key, an
-     * {@link IllegalArgumentException} will be thrown.
-     */
-    @Override
-    public void write(Entity value) throws Exception {
-      // Verify that the entity to write has a complete key.
-      if (!isValidKey(value.getKey())) {
-        throw new IllegalArgumentException(
-            "Entities to be written to the Datastore must have complete keys");
-      }
-
-      entities.add(value);
-
-      if (entities.size() >= DatastoreIO.DATASTORE_BATCH_UPDATE_LIMIT) {
-        flushBatch();
-      }
-    }
-
-    /**
-     * Flushes any pending batch writes and returns a DatastoreWriteResult.
-     */
-    @Override
-    public DatastoreWriteResult close() throws Exception {
-      if (entities.size() > 0) {
-        flushBatch();
-      }
-      return new DatastoreWriteResult(totalWritten);
-    }
-
-    @Override
-    public DatastoreWriteOperation getWriteOperation() {
-      return writeOp;
-    }
-
-    /**
-     * Writes a batch of entities to the Datastore.
-     *
-     * <p>If a commit fails, it will be retried (up to {@link DatastoreWriter#MAX_RETRIES}
-     * times).  All entities in the batch will be committed again, even if the commit was partially
-     * successful. If the retry limit is exceeded, the last exception from the Datastore will be
-     * thrown.
-     *
-     * @throws DatastoreException if the commit fails or IOException or InterruptedException if
-     * backing off between retries fails.
-     */
-    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
-      LOG.debug("Writing batch of {} entities", entities.size());
-      Sleeper sleeper = Sleeper.DEFAULT;
-      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
-      while (true) {
-        // Batch upsert entities.
-        try {
-          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-          commitRequest.getMutationBuilder().addAllUpsert(entities);
-          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-          datastore.commit(commitRequest.build());
-
-          // Break if the commit threw no exception.
-          break;
-
-        } catch (DatastoreException exception) {
-          // Only log the code and message for potentially-transient errors. The entire exception
-          // will be propagated upon the last retry.
-          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
-              exception.getMessage());
-          if (!BackOffUtils.next(sleeper, backoff)) {
-            LOG.error("Aborting after {} retries.", MAX_RETRIES);
-            throw exception;
-          }
-        }
-      }
-      totalWritten += entities.size();
-      LOG.debug("Successfully wrote {} entities", entities.size());
-      entities.clear();
-    }
-  }
-
-  private static class DatastoreWriteResult implements Serializable {
-    final long entitiesWritten;
-
-    public DatastoreWriteResult(long recordsWritten) {
-      this.entitiesWritten = recordsWritten;
-    }
-  }
-
-  /**
-   * A {@link Source.Reader} over the records from a query of the datastore.
-   *
-   * <p>Timestamped records are currently not supported.
-   * All records implicitly have the timestamp of {@code BoundedWindow.TIMESTAMP_MIN_VALUE}.
-   */
-  public static class DatastoreReader extends BoundedSource.BoundedReader<Entity> {
-    private final Source source;
-
-    /**
-     * Datastore to read from.
-     */
-    private final Datastore datastore;
-
-    /**
-     * True if more results may be available.
-     */
-    private boolean moreResults;
-
-    /**
-     * Iterator over records.
-     */
-    private java.util.Iterator<EntityResult> entities;
-
-    /**
-     * Current batch of query results.
-     */
-    private QueryResultBatch currentBatch;
-
-    /**
-     * Maximum number of results to request per query.
-     *
-     * <p>Must be set, or it may result in an I/O error when querying
-     * Cloud Datastore.
-     */
-    private static final int QUERY_BATCH_LIMIT = 500;
-
-    /**
-     * Remaining user-requested limit on the number of sources to return. If the user did not set a
-     * limit, then this variable will always have the value {@link Integer#MAX_VALUE} and will never
-     * be decremented.
-     */
-    private int userLimit;
-
-    private Entity currentEntity;
-
-    /**
-     * Returns a DatastoreReader with Source and Datastore object set.
-     *
-     * @param datastore a datastore connection to use.
-     */
-    public DatastoreReader(Source source, Datastore datastore) {
-      this.source = source;
-      this.datastore = datastore;
-      // If the user set a limit on the query, remember it. Otherwise pin to MAX_VALUE.
-      userLimit = source.query.hasLimit() ? source.query.getLimit() : Integer.MAX_VALUE;
-    }
-
-    @Override
-    public Entity getCurrent() {
-      return currentEntity;
-    }
-
-    @Override
-    public boolean start() throws IOException {
-      return advance();
-    }
-
-    @Override
-    public boolean advance() throws IOException {
-      if (entities == null || (!entities.hasNext() && moreResults)) {
-        try {
-          entities = getIteratorAndMoveCursor();
-        } catch (DatastoreException e) {
-          throw new IOException(e);
-        }
-      }
-
-      if (entities == null || !entities.hasNext()) {
-        currentEntity = null;
-        return false;
-      }
-
-      currentEntity = entities.next().getEntity();
-      return true;
-    }
-
-    @Override
-    public void close() throws IOException {
-      // Nothing
-    }
-
-    @Override
-    public DatastoreIO.Source getCurrentSource() {
-      return source;
-    }
-
-    @Override
-    public DatastoreIO.Source splitAtFraction(double fraction) {
-      // Not supported.
-      return null;
-    }
-
-    @Override
-    public Double getFractionConsumed() {
-      // Not supported.
-      return null;
-    }
-
-    /**
-     * Returns an iterator over the next batch of records for the query
-     * and updates the cursor to get the next batch as needed.
-     * Query has specified limit and offset from InputSplit.
-     */
-    private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
-      Query.Builder query = source.query.toBuilder().clone();
-      query.setLimit(Math.min(userLimit, QUERY_BATCH_LIMIT));
-      if (currentBatch != null && currentBatch.hasEndCursor()) {
-        query.setStartCursor(currentBatch.getEndCursor());
-      }
-
-      RunQueryRequest request = source.makeRequest(query.build());
-      RunQueryResponse response = datastore.runQuery(request);
-
-      currentBatch = response.getBatch();
-
-      // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
-      // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
-      // use result count to determine if more results might exist.
-      int numFetch = currentBatch.getEntityResultCount();
-      if (source.query.hasLimit()) {
-        verify(userLimit >= numFetch,
-            "Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit",
-            userLimit, numFetch, query.getLimit());
-        userLimit -= numFetch;
-      }
-      moreResults =
-          // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied.
-          (userLimit > 0)
-          // All indications from the API are that there are/may be more results.
-          && ((numFetch == QUERY_BATCH_LIMIT) || (currentBatch.getMoreResults() == NOT_FINISHED));
-
-      // May receive a batch of 0 results if the number of records is a multiple
-      // of the request limit.
-      if (numFetch == 0) {
-        return null;
-      }
-
-      return currentBatch.getEntityResultList().iterator();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java
deleted file mode 100644
index dda500c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/FileBasedSink.java
+++ /dev/null
@@ -1,864 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.io;
-
-import com.google.api.client.googleapis.batch.BatchRequest;
-import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
-import com.google.api.client.googleapis.json.GoogleJsonError;
-import com.google.api.client.http.HttpHeaders;
-import com.google.api.client.http.HttpRequestInitializer;
-import com.google.api.services.storage.Storage;
-import com.google.api.services.storage.StorageRequest;
-import com.google.api.services.storage.model.StorageObject;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
-import com.google.cloud.dataflow.sdk.options.GcsOptions;
-import com.google.cloud.dataflow.sdk.options.PipelineOptions;
-import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
-import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
-import com.google.cloud.dataflow.sdk.util.FileIOChannelFactory;
-import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory;
-import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
-import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
-import com.google.cloud.dataflow.sdk.util.MimeTypes;
-import com.google.cloud.dataflow.sdk.util.Transport;
-import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.base.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.channels.WritableByteChannel;
-import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
-import java.nio.file.Paths;
-import java.nio.file.StandardCopyOption;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import javax.annotation.concurrent.NotThreadSafe;
-
-/**
- * Abstract {@link Sink} for file-based output. An implementation of FileBasedSink writes file-based
- * output and defines the format of output files (how values are written, headers/footers, MIME
- * type, etc.).
- *
- * <p>At pipeline construction time, the methods of FileBasedSink are called to validate the sink
- * and to create a {@link Sink.WriteOperation} that manages the process of writing to the sink.
- *
- * <p>The process of writing to file-based sink is as follows:
- * <ol>
- * <li>An optional subclass-defined initialization,
- * <li>a parallel write of bundles to temporary files, and finally,
- * <li>these temporary files are renamed with final output filenames.
- * </ol>
- *
- * <p>Supported file systems are those registered with {@link IOChannelUtils}.
- *
- * @param <T> the type of values written to the sink.
- */
-public abstract class FileBasedSink<T> extends Sink<T> {
-  /**
-   * Base filename for final output files.
-   */
-  protected final String baseOutputFilename;
-
-  /**
-   * The extension to be used for the final output files.
-   */
-  protected final String extension;
-
-  /**
-   * Naming template for output files. See {@link ShardNameTemplate} for a description of
-   * possible naming templates.  Default is {@link ShardNameTemplate#INDEX_OF_MAX}.
-   */
-  protected final String fileNamingTemplate;
-
-  /**
-   * Construct a FileBasedSink with the given base output filename and extension.
-   */
-  public FileBasedSink(String baseOutputFilename, String extension) {
-    this(baseOutputFilename, extension, ShardNameTemplate.INDEX_OF_MAX);
-  }
-
-  /**
-   * Construct a FileBasedSink with the given base output filename, extension, and file naming
-   * template.
-   *
-   * <p>See {@link ShardNameTemplate} for a description of file naming templates.
-   */
-  public FileBasedSink(String baseOutputFilename, String extension, String fileNamingTemplate) {
-    this.baseOutputFilename = baseOutputFilename;
-    this.extension = extension;
-    this.fileNamingTemplate = fileNamingTemplate;
-  }
-
-  /**
-   * Returns the base output filename for this file based sink.
-   */
-  public String getBaseOutputFilename() {
-    return baseOutputFilename;
-  }
-
-  /**
-   * Perform pipeline-construction-time validation. The default implementation is a no-op.
-   * Subclasses should override to ensure the sink is valid and can be written to. It is recommended
-   * to use {@link Preconditions} in the implementation of this method.
-   */
-  @Override
-  public void validate(PipelineOptions options) {}
-
-  /**
-   * Return a subclass of {@link FileBasedSink.FileBasedWriteOperation} that will manage the write
-   * to the sink.
-   */
-  @Override
-  public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options);
-
-  /**
-   * Abstract {@link Sink.WriteOperation} that manages the process of writing to a
-   * {@link FileBasedSink}.
-   *
-   * <p>The primary responsibilities of the FileBasedWriteOperation is the management of output
-   * files. During a write, {@link FileBasedSink.FileBasedWriter}s write bundles to temporary file
-   * locations. After the bundles have been written,
-   * <ol>
-   * <li>{@link FileBasedSink.FileBasedWriteOperation#finalize} is given a list of the temporary
-   * files containing the output bundles.
-   * <li>During finalize, these temporary files are copied to final output locations and named
-   * according to a file naming template.
-   * <li>Finally, any temporary files that were created during the write are removed.
-   * </ol>
-   *
-   * <p>Subclass implementations of FileBasedWriteOperation must implement
-   * {@link FileBasedSink.FileBasedWriteOperation#createWriter} to return a concrete
-   * FileBasedSinkWriter.
-   *
-   * <h2>Temporary and Output File Naming:</h2> During the write, bundles are written to temporary
-   * files using the baseTemporaryFilename that can be provided via the constructor of
-   * FileBasedWriteOperation. These temporary files will be named
-   * {@code {baseTemporaryFilename}-temp-{bundleId}}, where bundleId is the unique id of the bundle.
-   * For example, if baseTemporaryFilename is "gs://my-bucket/my_temp_output", the output for a
-   * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output-temp-15723".
-   *
-   * <p>Final output files are written to baseOutputFilename with the format
-   * {@code {baseOutputFilename}-0000i-of-0000n.{extension}} where n is the total number of bundles
-   * written and extension is the file extension. Both baseOutputFilename and extension are required
-   * constructor arguments.
-   *
-   * <p>Subclass implementations can change the file naming template by supplying a value for
-   * {@link FileBasedSink#fileNamingTemplate}.
-   *
-   * <h2>Temporary Bundle File Handling:</h2>
-   * <p>{@link FileBasedSink.FileBasedWriteOperation#temporaryFileRetention} controls the behavior
-   * for managing temporary files. By default, temporary files will be removed. Subclasses can
-   * provide a different value to the constructor.
-   *
-   * <p>Note that in the case of permanent failure of a bundle's write, no clean up of temporary
-   * files will occur.
-   *
-   * <p>If there are no elements in the PCollection being written, no output will be generated.
-   *
-   * @param <T> the type of values written to the sink.
-   */
-  public abstract static class FileBasedWriteOperation<T> extends WriteOperation<T, FileResult> {
-    private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriteOperation.class);
-
-    /**
-     * Options for handling of temporary output files.
-     */
-    public enum TemporaryFileRetention {
-      KEEP,
-      REMOVE;
-    }
-
-    /**
-     * The Sink that this WriteOperation will write to.
-     */
-    protected final FileBasedSink<T> sink;
-
-    /**
-     * Option to keep or remove temporary output files.
-     */
-    protected final TemporaryFileRetention temporaryFileRetention;
-
-    /**
-     * Base filename used for temporary output files. Default is the baseOutputFilename.
-     */
-    protected final String baseTemporaryFilename;
-
-    /**
-     * Name separator for temporary files. Temporary files will be named
-     * {@code {baseTemporaryFilename}-temp-{bundleId}}.
-     */
-    protected static final String TEMPORARY_FILENAME_SEPARATOR = "-temp-";
-
-    /**
-     * Build a temporary filename using the temporary filename separator with the given prefix and
-     * suffix.
-     */
-    protected static final String buildTemporaryFilename(String prefix, String suffix) {
-      return prefix + FileBasedWriteOperation.TEMPORARY_FILENAME_SEPARATOR + suffix;
-    }
-
-    /**
-     * Construct a FileBasedWriteOperation using the same base filename for both temporary and
-     * output files.
-     *
-     * @param sink the FileBasedSink that will be used to configure this write operation.
-     */
-    public FileBasedWriteOperation(FileBasedSink<T> sink) {
-      this(sink, sink.baseOutputFilename);
-    }
-
-    /**
-     * Construct a FileBasedWriteOperation.
-     *
-     * @param sink the FileBasedSink that will be used to configure this write operation.
-     * @param baseTemporaryFilename the base filename to be used for temporary output files.
-     */
-    public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename) {
-      this(sink, baseTemporaryFilename, TemporaryFileRetention.REMOVE);
-    }
-
-    /**
-     * Create a new FileBasedWriteOperation.
-     *
-     * @param sink the FileBasedSink that will be used to configure this write operation.
-     * @param baseTemporaryFilename the base filename to be used for temporary output files.
-     * @param temporaryFileRetention defines how temporary files are handled.
-     */
-    public FileBasedWriteOperation(FileBasedSink<T> sink, String baseTemporaryFilename,
-        TemporaryFileRetention temporaryFileRetention) {
-      this.sink = sink;
-      this.baseTemporaryFilename = baseTemporaryFilename;
-      this.temporaryFileRetention = temporaryFileRetention;
-    }
-
-    /**
-     * Clients must implement to return a subclass of {@link FileBasedSink.FileBasedWriter}. This
-     * method must satisfy the restrictions placed on implementations of
-     * {@link Sink.WriteOperation#createWriter}. Namely, it must not mutate the state of the object.
-     */
-    @Override
-    public abstract FileBasedWriter<T> createWriter(PipelineOptions options) throws Exception;
-
-    /**
-     * Initialization of the sink. Default implementation is a no-op. May be overridden by subclass
-     * implementations to perform initialization of the sink at pipeline runtime. This method must
-     * be idempotent and is subject to the same implementation restrictions as
-     * {@link Sink.WriteOperation#initialize}.
-     */
-    @Override
-    public void initialize(PipelineOptions options) throws Exception {}
-
-    /**
-     * Finalizes writing by copying temporary output files to their final location and optionally
-     * removing temporary files.
-     *
-     * <p>Finalization may be overridden by subclass implementations to perform customized
-     * finalization (e.g., initiating some operation on output bundles, merging them, etc.).
-     * {@code writerResults} contains the filenames of written bundles.
-     *
-     * <p>If subclasses override this method, they must guarantee that its implementation is
-     * idempotent, as it may be executed multiple times in the case of failure or for redundancy. It
-     * is a best practice to attempt to try to make this method atomic.
-     *
-     * @param writerResults the results of writes (FileResult).
-     */
-    @Override
-    public void finalize(Iterable<FileResult> writerResults, PipelineOptions options)
-        throws Exception {
-      // Collect names of temporary files and rename them.
-      List<String> files = new ArrayList<>();
-      for (FileResult result : writerResults) {
-        LOG.debug("Temporary bundle output file {} will be copied.", result.getFilename());
-        files.add(result.getFilename());
-      }
-      copyToOutputFiles(files, options);
-
-      // Optionally remove temporary files.
-      if (temporaryFileRetention == TemporaryFileRetention.REMOVE) {
-        removeTemporaryFiles(options);
-      }
-    }
-
-    /**
-     * Copy temporary files to final output filenames using the file naming template.
-     *
-     * <p>Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}.
-     *
-     * <p>Files will be named according to the file naming template. The order of the output files
-     * will be the same as the sorted order of the input filenames.  In other words, if the input
-     * filenames are ["C", "A", "B"], baseOutputFilename is "file", the extension is ".txt", and
-     * the fileNamingTemplate is "-SSS-of-NNN", the contents of A will be copied to
-     * file-000-of-003.txt, the contents of B will be copied to file-001-of-003.txt, etc.
-     *
-     * @param filenames the filenames of temporary files.
-     * @return a list containing the names of final output files.
-     */
-    protected final List<String> copyToOutputFiles(List<String> filenames, PipelineOptions options)
-        throws IOException {
-      int numFiles = filenames.size();
-      List<String> srcFilenames = new ArrayList<>();
-      List<String> destFilenames = generateDestinationFilenames(numFiles);
-
-      // Sort files for copying.
-      srcFilenames.addAll(filenames);
-      Collections.sort(srcFilenames);
-
-      if (numFiles > 0) {
-        LOG.debug("Copying {} files.", numFiles);
-        FileOperations fileOperations =
-            FileOperationsFactory.getFileOperations(destFilenames.get(0), options);
-        fileOperations.copy(srcFilenames, destFilenames);
-      } else {
-        LOG.info("No output files to write.");
-      }
-
-      return destFilenames;
-    }
-
-    /**
-     * Generate output bundle filenames.
-     */
-    protected final List<String> generateDestinationFilenames(int numFiles) {
-      List<String> destFilenames = new ArrayList<>();
-      String extension = getSink().extension;
-      String baseOutputFilename = getSink().baseOutputFilename;
-      String fileNamingTemplate = getSink().fileNamingTemplate;
-
-      String suffix = getFileExtension(extension);
-      for (int i = 0; i < numFiles; i++) {
-        destFilenames.add(IOChannelUtils.constructName(
-            baseOutputFilename, fileNamingTemplate, suffix, i, numFiles));
-      }
-      return destFilenames;
-    }
-
-    /**
-     * Returns the file extension to be used. If the user did not request a file
-     * extension then this method returns the empty string. Otherwise this method
-     * adds a {@code "."} to the beginning of the users extension if one is not present.
-     */
-    private String getFileExtension(String usersExtension) {
-      if (usersExtension == null || usersExtension.isEmpty()) {
-        return "";
-      }
-      if (usersExtension.startsWith(".")) {
-        return usersExtension;
-      }
-      return "." + usersExtension;
-    }
-
-    /**
-     * Removes temporary output files. Uses the temporary filename to find files to remove.
-     *
-     * <p>Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}.
-     * <b>Note:</b>If finalize is overridden and does <b>not</b> rename or otherwise finalize
-     * temporary files, this method will remove them.
-     */
-    protected final void removeTemporaryFiles(PipelineOptions options) throws IOException {
-      String pattern = buildTemporaryFilename(baseTemporaryFilename, "*");
-      LOG.debug("Finding temporary bundle output files matching {}.", pattern);
-      FileOperations fileOperations = FileOperationsFactory.getFileOperations(pattern, options);
-      IOChannelFactory factory = IOChannelUtils.getFactory(pattern);
-      Collection<String> matches = factory.match(pattern);
-      LOG.debug("{} temporary files matched {}", matches.size(), pattern);
-      LOG.debug("Removing {} files.", matches.size());
-      fileOperations.remove(matches);
-    }
-
-    /**
-     * Provides a coder for {@link FileBasedSink.FileResult}.
-     */
-    @Override
-    public Coder<FileResult> getWriterResultCoder() {
-      return SerializableCoder.of(FileResult.class);
-    }
-
-    /**
-     * Returns the FileBasedSink for this write operation.
-     */
-    @Override
-    public FileBasedSink<T> getSink() {
-      return sink;
-    }
-  }
-
-  /**
-   * Abstract {@link Sink.Writer} that writes a bundle to a {@link FileBasedSink}. Subclass
-   * implementations provide a method that can write a single value to a {@link WritableByteChannel}
-   * ({@link Sink.Writer#write}).
-   *
-   * <p>Subclass implementations may also override methods that write headers and footers before and
-   * after the values in a bundle, respectively, as well as provide a MIME type for the output
-   * channel.
-   *
-   * <p>Multiple FileBasedWriter instances may be created on the same worker, and therefore any
-   * access to static members or methods should be thread safe.
-   *
-   * @param <T> the type of values to write.
-   */
-  public abstract static class FileBasedWriter<T> extends Writer<T, FileResult> {
-    private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class);
-
-    final FileBasedWriteOperation<T> writeOperation;
-
-    /**
-     * Unique id for this output bundle.
-     */
-    private String id;
-
-    /**
-     * The filename of the output bundle. Equal to the
-     * {@link FileBasedSink.FileBasedWriteOperation#TEMPORARY_FILENAME_SEPARATOR} and id appended to
-     * the baseName.
-     */
-    private String filename;
-
-    /**
-     * The channel to write to.
-     */
-    private WritableByteChannel channel;
-
-    /**
-     * The MIME type used in the creation of the output channel (if the file system supports it).
-     *
-     * <p>GCS, for example, supports writing files with Content-Type metadata.
-     *
-     * <p>May be overridden. Default is {@link MimeTypes#TEXT}. See {@link MimeTypes} for other
-     * options.
-     */
-    protected String mimeType = MimeTypes.TEXT;
-
-    /**
-     * Construct a new FileBasedWriter with a base filename.
-     */
-    public FileBasedWriter(FileBasedWriteOperation<T> writeOperation) {
-      Preconditions.checkNotNull(writeOperation);
-      this.writeOperation = writeOperation;
-    }
-
-    /**
-     * Called with the channel that a subclass will write its header, footer, and values to.
-     * Subclasses should either keep a reference to the channel provided or create and keep a
-     * reference to an appropriate object that they will use to write to it.
-     *
-     * <p>Called before any subsequent calls to writeHeader, writeFooter, and write.
-     */
-    protected abstract void prepareWrite(WritableByteChannel channel) throws Exception;
-
-    /**
-     * Writes header at the beginning of output files. Nothing by default; subclasses may override.
-     */
-    protected void writeHeader() throws Exception {}
-
-    /**
-     * Writes footer at the end of output files. Nothing by default; subclasses may override.
-     */
-    protected void writeFooter() throws Exception {}
-
-    /**
-     * Opens the channel.
-     */
-    @Override
-    public final void open(String uId) throws Exception {
-      this.id = uId;
-      filename = FileBasedWriteOperation.buildTemporaryFilename(
-          getWriteOperation().baseTemporaryFilename, uId);
-      LOG.debug("Opening {}.", filename);
-      channel = IOChannelUtils.create(filename, mimeType);
-      try {
-        prepareWrite(channel);
-        LOG.debug("Writing header to {}.", filename);
-        writeHeader();
-      } catch (Exception e) {
-        // The caller shouldn't have to close() this Writer if it fails to open(), so close the
-        // channel if prepareWrite() or writeHeader() fails.
-        try {
-          LOG.error("Writing header to {} failed, closing channel.", filename);
-          channel.close();
-        } catch (IOException closeException) {
-          // Log exception and mask it.
-          LOG.error("Closing channel for {} failed: {}", filename, closeException.getMessage());
-        }
-        // Throw the exception that caused the write to fail.
-        throw e;
-      }
-      LOG.debug("Starting write of bundle {} to {}.", this.id, filename);
-    }
-
-    /**
-     * Closes the channel and return the bundle result.
-     */
-    @Override
-    public final FileResult close() throws Exception {
-      try (WritableByteChannel theChannel = channel) {
-        LOG.debug("Writing footer to {}.", filename);
-        writeFooter();
-      }
-      FileResult result = new FileResult(filename);
-      LOG.debug("Result for bundle {}: {}", this.id, filename);
-      return result;
-    }
-
-    /**
-     * Return the FileBasedWriteOperation that this Writer belongs to.
-     */
-    @Override
-    public FileBasedWriteOperation<T> getWriteOperation() {
-      return writeOperation;
-    }
-  }
-
-  /**
-   * Result of a single bundle write. Contains the filename of the bundle.
-   */
-  public static final class FileResult implements Serializable {
-    private final String filename;
-
-    public FileResult(String filename) {
-      this.filename = filename;
-    }
-
-    public String getFilename() {
-      return filename;
-    }
-  }
-
-  // File system operations
-  // Warning: These class are purposefully private and will be replaced by more robust file I/O
-  // utilities. Not for use outside FileBasedSink.
-
-  /**
-   * Factory for FileOperations.
-   */
-  private static class FileOperationsFactory {
-    /**
-     * Return a FileOperations implementation based on which IOChannel would be used to write to a
-     * location specification (not necessarily a filename, as it may contain wildcards).
-     *
-     * <p>Only supports File and GCS locations (currently, the only factories registered with
-     * IOChannelUtils). For other locations, an exception is thrown.
-     */
-    public static FileOperations getFileOperations(String spec, PipelineOptions options)
-        throws IOException {
-      IOChannelFactory factory = IOChannelUtils.getFactory(spec);
-      if (factory instanceof GcsIOChannelFactory) {
-        return new GcsOperations(options);
-      } else if (factory instanceof FileIOChannelFactory) {
-        return new LocalFileOperations();
-      } else {
-        throw new IOException("Unrecognized file system.");
-      }
-    }
-  }
-
-  /**
-   * Copy and Remove operations for files. Operations behave like remove-if-existing and
-   * copy-if-existing and do not throw exceptions on file not found to enable retries of these
-   * operations in the case of transient error.
-   */
-  private static interface FileOperations {
-    /**
-     * Copy a collection of files from one location to another.
-     *
-     * <p>The number of source filenames must equal the number of destination filenames.
-     *
-     * @param srcFilenames the source filenames.
-     * @param destFilenames the destination filenames.
-     */
-    public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
-
-    /**
-     * Remove a collection of files.
-     */
-    public void remove(Collection<String> filenames) throws IOException;
-  }
-
-  /**
-   * GCS file system operations.
-   */
-  private static class GcsOperations implements FileOperations {
-    private static final Logger LOG = LoggerFactory.getLogger(GcsOperations.class);
-
-    /**
-     * Maximum number of requests permitted in a GCS batch request.
-     */
-    private static final int MAX_REQUESTS_PER_BATCH = 1000;
-
-    private ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-    private GcsOptions gcsOptions;
-    private Storage gcs;
-    private BatchHelper batchHelper;
-
-    public GcsOperations(PipelineOptions options) {
-      gcsOptions = options.as(GcsOptions.class);
-      gcs = Transport.newStorageClient(gcsOptions).build();
-      batchHelper =
-          new BatchHelper(gcs.getRequestFactory().getInitializer(), gcs, MAX_REQUESTS_PER_BATCH);
-    }
-
-    @Override
-    public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
-      Preconditions.checkArgument(
-          srcFilenames.size() == destFilenames.size(),
-          String.format("Number of source files {} must equal number of destination files {}",
-              srcFilenames.size(), destFilenames.size()));
-      for (int i = 0; i < srcFilenames.size(); i++) {
-        final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
-        final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
-        LOG.debug("Copying {} to {}", sourcePath, destPath);
-        Storage.Objects.Copy copyObject = gcs.objects().copy(sourcePath.getBucket(),
-            sourcePath.getObject(), destPath.getBucket(), destPath.getObject(), null);
-        batchHelper.queue(copyObject, new JsonBatchCallback<StorageObject>() {
-          @Override
-          public void onSuccess(StorageObject obj, HttpHeaders responseHeaders) {
-            LOG.debug("Successfully copied {} to {}", sourcePath, destPath);
-          }
-
-          @Override
-          public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
-            // Do nothing on item not found.
-            if (!errorExtractor.itemNotFound(e)) {
-              throw new IOException(e.toString());
-            }
-            LOG.debug("{} does not exist.", sourcePath);
-          }
-        });
-      }
-      batchHelper.flush();
-    }
-
-    @Override
-    public void remove(Collection<String> filenames) throws IOException {
-      for (String filename : filenames) {
-        final GcsPath path = GcsPath.fromUri(filename);
-        LOG.debug("Removing: " + path);
-        Storage.Objects.Delete deleteObject =
-            gcs.objects().delete(path.getBucket(), path.getObject());
-        batchHelper.queue(deleteObject, new JsonBatchCallback<Void>() {
-          @Override
-          public void onSuccess(Void obj, HttpHeaders responseHeaders) throws IOException {
-            LOG.debug("Successfully removed {}", path);
-          }
-
-          @Override
-          public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException {
-            // Do nothing on item not found.
-            if (!errorExtractor.itemNotFound(e)) {
-              throw new IOException(e.toString());
-            }
-            LOG.debug("{} does not exist.", path);
-          }
-        });
-      }
-      batchHelper.flush();
-    }
-  }
-
-  /**
-   * File systems supported by {@link Files}.
-   */
-  private static class LocalFileOperations implements FileOperations {
-    private static final Logger LOG = LoggerFactory.getLogger(LocalFileOperations.class);
-
-    @Override
-    public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
-      Preconditions.checkArgument(
-          srcFilenames.size() == destFilenames.size(),
-          String.format("Number of source files {} must equal number of destination files {}",
-              srcFilenames.size(), destFilenames.size()));
-      int numFiles = srcFilenames.size();
-      for (int i = 0; i < numFiles; i++) {
-        String src = srcFilenames.get(i);
-        String dst = destFilenames.get(i);
-        LOG.debug("Copying {} to {}", src, dst);
-        copyOne(src, dst);
-      }
-    }
-
-    private void copyOne(String source, String destination) throws IOException {
-      try {
-        // Copy the source file, replacing the existing destination.
-        Files.copy(Paths.get(source), Paths.get(destination), StandardCopyOption.REPLACE_EXISTING);
-      } catch (NoSuchFileException e) {
-        LOG.debug("{} does not exist.", source);
-        // Suppress exception if file does not exist.
-      }
-    }
-
-    @Override
-    public void remove(Collection<String> filenames) throws IOException {
-      for (String filename : filenames) {
-        LOG.debug("Removing file {}", filename);
-        removeOne(filename);
-      }
-    }
-
-    private void removeOne(String filename) throws IOException {
-      // Delete the file if it exists.
-      boolean exists = Files.deleteIfExists(Paths.get(filename));
-      if (!exists) {
-        LOG.debug("{} does not exist.", filename);
-      }
-    }
-  }
-
-  /**
-   * BatchHelper abstracts out the logic for the maximum requests per batch for GCS.
-   *
-   * <p>Copy of
-   * https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/src/main/java/com/google/cloud/hadoop/gcsio/BatchHelper.java
-   *
-   * <p>Copied to prevent Dataflow from depending on the Hadoop-related dependencies that are not
-   * used in Dataflow.  Hadoop-related dependencies will be removed from the Google Cloud Storage
-   * Connector (https://cloud.google.com/hadoop/google-cloud-storage-connector) so that this project
-   * and others may use the connector without introducing unnecessary dependencies.
-   *
-   * <p>This class is not thread-safe; create a new BatchHelper instance per single-threaded logical
-   * grouping of requests.
-   */
-  @NotThreadSafe
-  private static class BatchHelper {
-    /**
-     * Callback that causes a single StorageRequest to be added to the BatchRequest.
-     */
-    protected static interface QueueRequestCallback {
-      void enqueue() throws IOException;
-    }
-
-    private final List<QueueRequestCallback> pendingBatchEntries;
-    private final BatchRequest batch;
-
-    // Number of requests that can be queued into a single actual HTTP request
-    // before a sub-batch is sent.
-    private final long maxRequestsPerBatch;
-
-    // Flag that indicates whether there is an in-progress flush.
-    private boolean flushing = false;
-
-    /**
-     * Primary constructor, generally accessed only via the inner Factory class.
-     */
-    public BatchHelper(
-        HttpRequestInitializer requestInitializer, Storage gcs, long maxRequestsPerBatch) {
-      this.pendingBatchEntries = new LinkedList<>();
-      this.batch = gcs.batch(requestInitializer);
-      this.maxRequestsPerBatch = maxRequestsPerBatch;
-    }
-
-    /**
-     * Adds an additional request to the batch, and possibly flushes the current contents of the
-     * batch if {@code maxRequestsPerBatch} has been reached.
-     */
-    public <T> void queue(final StorageRequest<T> req, final JsonBatchCallback<T> callback)
-        throws IOException {
-      QueueRequestCallback queueCallback = new QueueRequestCallback() {
-        @Override
-        public void enqueue() throws IOException {
-          req.queue(batch, callback);
-        }
-      };
-      pendingBatchEntries.add(queueCallback);
-
-      flushIfPossibleAndRequired();
-    }
-
-    // Flush our buffer if we have more pending entries than maxRequestsPerBatch
-    private void flushIfPossibleAndRequired() throws IOException {
-      if (pendingBatchEntries.size() > maxRequestsPerBatch) {
-        flushIfPossible();
-      }
-    }
-
-    // Flush our buffer if we are not already in a flush operation and we have data to flush.
-    private void flushIfPossible() throws IOException {
-      if (!flushing && pendingBatchEntries.size() > 0) {
-        flushing = true;
-        try {
-          while (batch.size() < maxRequestsPerBatch && pendingBatchEntries.size() > 0) {
-            QueueRequestCallback head = pendingBatchEntries.remove(0);
-            head.enqueue();
-          }
-
-          batch.execute();
-        } finally {
-          flushing = false;
-        }
-      }
-    }
-
-
-    /**
-     * Sends any currently remaining requests in the batch; should be called at the end of any
-     * series of batched requests to ensure everything has been sent.
-     */
-    public void flush() throws IOException {
-      flushIfPossible();
-    }
-  }
-
-  static class ReshardForWrite<T> extends PTransform<PCollection<T>, PCollection<T>> {
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      return input
-          // TODO: This would need to be adapted to write per-window shards.
-          .apply(Window.<T>into(new GlobalWindows())
-                       .triggering(DefaultTrigger.of())
-                       .discardingFiredPanes())
-          .apply("RandomKey", ParDo.of(
-              new DoFn<T, KV<Long, T>>() {
-                transient long counter, step;
-                @Override
-                public void startBundle(Context c) {
-                  counter = (long) (Math.random() * Long.MAX_VALUE);
-                  step = 1 + 2 * (long) (Math.random() * Long.MAX_VALUE);
-                }
-                @Override
-                public void processElement(ProcessContext c) {
-                  counter += step;
-                  c.output(KV.of(counter, c.element()));
-                }
-              }))
-          .apply(GroupByKey.<Long, T>create())
-          .apply("Ungroup", ParDo.of(
-              new DoFn<KV<Long, Iterable<T>>, T>() {
-                @Override
-                public void processElement(ProcessContext c) {
-                  for (T item : c.element().getValue()) {
-                    c.output(item);
-                  }
-                }
-              }));
-    }
-  }
-}


Mime
View raw message