beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/3] incubator-beam git commit: [BEAM-77] Move bigtable in IO
Date Tue, 26 Apr 2016 21:23:11 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 9746f0d1d -> aa43ec0b0


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
new file mode 100644
index 0000000..d1d5cd6
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>io-parent</artifactId>
+    <version>0.1.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>google-cloud-platform</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: IO :: Google Cloud Platform</name>
+  <description>IO library to read and write Google Cloud Platform systems from Beam.</description>
+  <packaging>jar</packaging>
+
+  <properties>
+    <bigtable.version>0.2.3</bigtable.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.grpc</groupId>
+      <artifactId>grpc-all</artifactId>
+      <version>0.12.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.bigtable</groupId>
+      <artifactId>bigtable-protos</artifactId>
+      <version>${bigtable.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.bigtable</groupId>
+      <artifactId>bigtable-client-core</artifactId>
+      <version>${bigtable.version}</version>
+    </dependency>
+
+    <!-- test -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>java-sdk-all</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>1.3</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-jdk14</artifactId>
+      <version>1.7.14</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
new file mode 100644
index 0000000..f99eb1d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -0,0 +1,1016 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
+import org.apache.beam.sdk.io.Sink.WriteOperation;
+import org.apache.beam.sdk.io.Sink.Writer;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowFilter;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.annotation.Nullable;
+
+/**
+ * A bounded source and sink for Google Cloud Bigtable.
+ *
+ * <p>For more information, see the online documentation at
+ * <a href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>.
+ *
+ * <h3>Reading from Cloud Bigtable</h3>
+ *
+ * <p>The Bigtable source returns a set of rows from a single table, returning a
+ * {@code PCollection<Row>}.
+ *
+ * <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
+ * or builder configured with the project and other information necessary to identify the
+ * Bigtable cluster. A {@link RowFilter} may also optionally be specified using
+ * {@link BigtableIO.Read#withRowFilter}. For example:
+ *
+ * <pre>{@code
+ * BigtableOptions.Builder optionsBuilder =
+ *     new BigtableOptions.Builder()
+ *         .setProjectId("project")
+ *         .setClusterId("cluster")
+ *         .setZoneId("zone");
+ *
+ * Pipeline p = ...;
+ *
+ * // Scan the entire table.
+ * p.apply("read",
+ *     BigtableIO.read()
+ *         .withBigtableOptions(optionsBuilder)
+ *         .withTableId("table"));
+ *
+ * // Scan a subset of rows that match the specified row filter.
+ * p.apply("filtered read",
+ *     BigtableIO.read()
+ *         .withBigtableOptions(optionsBuilder)
+ *         .withTableId("table")
+ *         .withRowFilter(filter));
+ * }</pre>
+ *
+ * <h3>Writing to Cloud Bigtable</h3>
+ *
+ * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a
+ * {@link PCollection PCollection&lt;KV&lt;ByteString, Iterable&lt;Mutation&gt;&gt;&gt;}, where the
+ * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an
+ * idempotent transformation to that row.
+ *
+ * <p>To configure a Cloud Bigtable sink, you must supply a table id and a {@link BigtableOptions}
+ * or builder configured with the project and other information necessary to identify the
+ * Bigtable cluster, for example:
+ *
+ * <pre>{@code
+ * BigtableOptions.Builder optionsBuilder =
+ *     new BigtableOptions.Builder()
+ *         .setProjectId("project")
+ *         .setClusterId("cluster")
+ *         .setZoneId("zone");
+ *
+ * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...;
+ *
+ * data.apply("write",
+ *     BigtableIO.write()
+ *         .withBigtableOptions(optionsBuilder)
+ *         .withTableId("table"));
+ * }</pre>
+ *
+ * <h3>Experimental</h3>
+ *
+ * <p>This connector for Cloud Bigtable is considered experimental and may break or receive
+ * backwards-incompatible changes in future versions of the Cloud Dataflow SDK. Cloud Bigtable is
+ * in Beta, and thus it may introduce breaking changes in future revisions of its service or APIs.
+ *
+ * <h3>Permissions</h3>
+ *
+ * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
+ * Dataflow job. Please refer to the documentation of corresponding
+ * {@link PipelineRunner PipelineRunners} for more details.
+ */
+@Experimental
+public class BigtableIO {
+  private static final Logger logger = LoggerFactory.getLogger(BigtableIO.class);
+
+  /**
+   * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be
+   * initialized with a
+   * {@link BigtableIO.Read#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
+   * the source Cloud Bigtable cluster, and a {@link BigtableIO.Read#withTableId tableId} that
+   * specifies which table to read. A {@link RowFilter} may also optionally be specified using
+   * {@link BigtableIO.Read#withRowFilter}.
+   */
+  @Experimental
+  public static Read read() {
+    return new Read(null, "", null, null);
+  }
+
+  /**
+   * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be
+   * initialized with a
+   * {@link BigtableIO.Write#withBigtableOptions(BigtableOptions) BigtableOptions} that specifies
+   * the destination Cloud Bigtable cluster, and a {@link BigtableIO.Write#withTableId tableId} that
+   * specifies which table to write.
+   */
+  @Experimental
+  public static Write write() {
+    return new Write(null, "", null);
+  }
+
+  /**
+   * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on
+   * {@link BigtableIO} for more information.
+   *
+   * @see BigtableIO
+   */
+  @Experimental
+  public static class Read extends PTransform<PBegin, PCollection<Row>> {
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
+     * indicated by the given options, and using any other specified customizations.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withBigtableOptions(BigtableOptions options) {
+      checkNotNull(options, "options");
+      return withBigtableOptions(options.toBuilder());
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable cluster
+     * indicated by the given options, and using any other specified customizations.
+     *
+     * <p>Clones the given {@link BigtableOptions} builder so that any further changes
+     * will have no effect on the returned {@link BigtableIO.Read}.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
+      checkNotNull(optionsBuilder, "optionsBuilder");
+      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
+      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
+      BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
+      return new Read(optionsWithAgent, tableId, filter, bigtableService);
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable
+     * using the given row filter.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withRowFilter(RowFilter filter) {
+      checkNotNull(filter, "filter");
+      return new Read(options, tableId, filter, bigtableService);
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read from the specified table.
+     *
+     * <p>Does not modify this object.
+     */
+    public Read withTableId(String tableId) {
+      checkNotNull(tableId, "tableId");
+      return new Read(options, tableId, filter, bigtableService);
+    }
+
+    /**
+     * Returns the Google Cloud Bigtable cluster being read from, and other parameters.
+     */
+    public BigtableOptions getBigtableOptions() {
+      return options;
+    }
+
+    /**
+     * Returns the table being read from.
+     */
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public PCollection<Row> apply(PBegin input) {
+      BigtableSource source =
+          new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
+      return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
+    }
+
+    @Override
+    public void validate(PBegin input) {
+      checkArgument(options != null, "BigtableOptions not specified");
+      checkArgument(!tableId.isEmpty(), "Table ID not specified");
+      try {
+        checkArgument(
+            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
+      } catch (IOException e) {
+        logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+      }
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder.add("tableId", tableId);
+
+      if (options != null) {
+        builder.add("bigtableOptions", options.toString());
+      }
+
+      if (filter != null) {
+        builder.add("rowFilter", filter.toString());
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(Read.class)
+          .add("options", options)
+          .add("tableId", tableId)
+          .add("filter", filter)
+          .toString();
+    }
+
+    /////////////////////////////////////////////////////////////////////////////////////////
+    /**
+     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
+     * Cannot actually be {@code null} at validation time, but may start out {@code null} while
+     * source is being built.
+     */
+    @Nullable private final BigtableOptions options;
+    private final String tableId;
+    @Nullable private final RowFilter filter;
+    @Nullable private final BigtableService bigtableService;
+
+    private Read(
+        @Nullable BigtableOptions options,
+        String tableId,
+        @Nullable RowFilter filter,
+        @Nullable BigtableService bigtableService) {
+      this.options = options;
+      this.tableId = checkNotNull(tableId, "tableId");
+      this.filter = filter;
+      this.bigtableService = bigtableService;
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable
+     * service implementation.
+     *
+     * <p>This is used for testing.
+     *
+     * <p>Does not modify this object.
+     */
+    Read withBigtableService(BigtableService bigtableService) {
+      checkNotNull(bigtableService, "bigtableService");
+      return new Read(options, tableId, filter, bigtableService);
+    }
+
+    /**
+     * Helper function that either returns the mock Bigtable service supplied by
+     * {@link #withBigtableService} or creates and returns an implementation that talks to
+     * {@code Cloud Bigtable}.
+     */
+    private BigtableService getBigtableService() {
+      if (bigtableService != null) {
+        return bigtableService;
+      }
+      return new BigtableServiceImpl(options);
+    }
+  }
+
+  /**
+   * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on
+   * {@link BigtableIO} for more information.
+   *
+   * @see BigtableIO
+   */
+  @Experimental
+  public static class Write
+      extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> {
+    /**
+     * Used to define the Cloud Bigtable cluster and any options for the networking layer.
+     * Cannot actually be {@code null} at validation time, but may start out {@code null} while
+     * source is being built.
+     */
+    @Nullable private final BigtableOptions options;
+    private final String tableId;
+    @Nullable private final BigtableService bigtableService;
+
+    private Write(
+        @Nullable BigtableOptions options,
+        String tableId,
+        @Nullable BigtableService bigtableService) {
+      this.options = options;
+      this.tableId = checkNotNull(tableId, "tableId");
+      this.bigtableService = bigtableService;
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
+     * indicated by the given options, and using any other specified customizations.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withBigtableOptions(BigtableOptions options) {
+      checkNotNull(options, "options");
+      return withBigtableOptions(options.toBuilder());
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable cluster
+     * indicated by the given options, and using any other specified customizations.
+     *
+     * <p>Clones the given {@link BigtableOptions} builder so that any further changes
+     * will have no effect on the returned {@link BigtableIO.Write}.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) {
+      checkNotNull(optionsBuilder, "optionsBuilder");
+      // TODO: is there a better way to clone a Builder? Want it to be immune from user changes.
+      BigtableOptions.Builder clonedBuilder = optionsBuilder.build().toBuilder();
+      BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
+      return new Write(optionsWithAgent, tableId, bigtableService);
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write to the specified table.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withTableId(String tableId) {
+      checkNotNull(tableId, "tableId");
+      return new Write(options, tableId, bigtableService);
+    }
+
+    /**
+     * Returns the Google Cloud Bigtable cluster being written to, and other parameters.
+     */
+    public BigtableOptions getBigtableOptions() {
+      return options;
+    }
+
+    /**
+     * Returns the table being written to.
+     */
+    public String getTableId() {
+      return tableId;
+    }
+
+    @Override
+    public PDone apply(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+      Sink sink = new Sink(tableId, getBigtableService());
+      return input.apply(org.apache.beam.sdk.io.Write.to(sink));
+    }
+
+    @Override
+    public void validate(PCollection<KV<ByteString, Iterable<Mutation>>> input) {
+      checkArgument(options != null, "BigtableOptions not specified");
+      checkArgument(!tableId.isEmpty(), "Table ID not specified");
+      try {
+        checkArgument(
+            getBigtableService().tableExists(tableId), "Table %s does not exist", tableId);
+      } catch (IOException e) {
+        logger.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+      }
+    }
+
+    /**
+     * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable
+     * service implementation.
+     *
+     * <p>This is used for testing.
+     *
+     * <p>Does not modify this object.
+     */
+    Write withBigtableService(BigtableService bigtableService) {
+      checkNotNull(bigtableService, "bigtableService");
+      return new Write(options, tableId, bigtableService);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder.add("tableId", tableId);
+
+      if (options != null) {
+        builder.add("bigtableOptions", options.toString());
+      }
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(Write.class)
+          .add("options", options)
+          .add("tableId", tableId)
+          .toString();
+    }
+
+    /**
+     * Helper function that either returns the mock Bigtable service supplied by
+     * {@link #withBigtableService} or creates and returns an implementation that talks to
+     * {@code Cloud Bigtable}.
+     */
+    private BigtableService getBigtableService() {
+      if (bigtableService != null) {
+        return bigtableService;
+      }
+      return new BigtableServiceImpl(options);
+    }
+  }
+
+  //////////////////////////////////////////////////////////////////////////////////////////
+  /** Disallow construction of utility class. */
+  private BigtableIO() {}
+
+  static class BigtableSource extends BoundedSource<Row> {
+    public BigtableSource(
+        BigtableService service,
+        String tableId,
+        @Nullable RowFilter filter,
+        ByteKeyRange range,
+        Long estimatedSizeBytes) {
+      this.service = service;
+      this.tableId = tableId;
+      this.filter = filter;
+      this.range = range;
+      this.estimatedSizeBytes = estimatedSizeBytes;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(BigtableSource.class)
+          .add("tableId", tableId)
+          .add("filter", filter)
+          .add("range", range)
+          .add("estimatedSizeBytes", estimatedSizeBytes)
+          .toString();
+    }
+
+    ////// Private state and internal implementation details //////
+    private final BigtableService service;
+    @Nullable private final String tableId;
+    @Nullable private final RowFilter filter;
+    private final ByteKeyRange range;
+    @Nullable private Long estimatedSizeBytes;
+    @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys;
+
+    protected BigtableSource withStartKey(ByteKey startKey) {
+      checkNotNull(startKey, "startKey");
+      return new BigtableSource(
+          service, tableId, filter, range.withStartKey(startKey), estimatedSizeBytes);
+    }
+
+    protected BigtableSource withEndKey(ByteKey endKey) {
+      checkNotNull(endKey, "endKey");
+      return new BigtableSource(
+          service, tableId, filter, range.withEndKey(endKey), estimatedSizeBytes);
+    }
+
+    protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) {
+      checkNotNull(estimatedSizeBytes, "estimatedSizeBytes");
+      return new BigtableSource(service, tableId, filter, range, estimatedSizeBytes);
+    }
+
+    /**
+     * Makes an API call to the Cloud Bigtable service that gives information about tablet key
+     * boundaries and estimated sizes. We can use these samples to ensure that splits are on
+     * different tablets, and possibly generate sub-splits within tablets.
+     */
+    private List<SampleRowKeysResponse> getSampleRowKeys() throws IOException {
+      return service.getSampleRowKeys(this);
+    }
+
+    @Override
+    public List<BigtableSource> splitIntoBundles(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      // Update the desiredBundleSizeBytes in order to limit the
+      // number of splits to maximumNumberOfSplits.
+      long maximumNumberOfSplits = 4000;
+      long sizeEstimate = getEstimatedSizeBytes(options);
+      desiredBundleSizeBytes =
+          Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes);
+
+      // Delegate to testable helper.
+      return splitIntoBundlesBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys());
+    }
+
+    /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */
+    private List<BigtableSource> splitIntoBundlesBasedOnSamples(
+        long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) {
+      // There are no regions, or no samples available. Just scan the entire range.
+      if (sampleRowKeys.isEmpty()) {
+        logger.info("Not splitting source {} because no sample row keys are available.", this);
+        return Collections.singletonList(this);
+      }
+
+      logger.info(
+          "About to split into bundles of size {} with sampleRowKeys length {} first element {}",
+          desiredBundleSizeBytes,
+          sampleRowKeys.size(),
+          sampleRowKeys.get(0));
+
+      // Loop through all sampled responses and generate splits from the ones that overlap the
+      // scan range. The main complication is that we must track the end range of the previous
+      // sample to generate good ranges.
+      ByteKey lastEndKey = ByteKey.EMPTY;
+      long lastOffset = 0;
+      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
+      for (SampleRowKeysResponse response : sampleRowKeys) {
+        ByteKey responseEndKey = ByteKey.of(response.getRowKey());
+        long responseOffset = response.getOffsetBytes();
+        checkState(
+            responseOffset >= lastOffset,
+            "Expected response byte offset %s to come after the last offset %s",
+            responseOffset,
+            lastOffset);
+
+        if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) {
+          // This region does not overlap the scan, so skip it.
+          lastOffset = responseOffset;
+          lastEndKey = responseEndKey;
+          continue;
+        }
+
+        // Calculate the beginning of the split as the larger of startKey and the end of the last
+        // split. Unspecified start is smallest key so is correctly treated as earliest key.
+        ByteKey splitStartKey = lastEndKey;
+        if (splitStartKey.compareTo(range.getStartKey()) < 0) {
+          splitStartKey = range.getStartKey();
+        }
+
+        // Calculate the end of the split as the smaller of endKey and the end of this sample. Note
+        // that range.containsKey handles the case when range.getEndKey() is empty.
+        ByteKey splitEndKey = responseEndKey;
+        if (!range.containsKey(splitEndKey)) {
+          splitEndKey = range.getEndKey();
+        }
+
+        // We know this region overlaps the desired key range, and we know a rough estimate of its
+        // size. Split the key range into bundle-sized chunks and then add them all as splits.
+        long sampleSizeBytes = responseOffset - lastOffset;
+        List<BigtableSource> subSplits =
+            splitKeyRangeIntoBundleSizedSubranges(
+                sampleSizeBytes,
+                desiredBundleSizeBytes,
+                ByteKeyRange.of(splitStartKey, splitEndKey));
+        splits.addAll(subSplits);
+
+        // Move to the next region.
+        lastEndKey = responseEndKey;
+        lastOffset = responseOffset;
+      }
+
+      // We must add one more region after the end of the samples if both these conditions hold:
+      //  1. we did not scan to the end yet (lastEndKey is concrete, not 0-length).
+      //  2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey).
+      if (!lastEndKey.isEmpty()
+          && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) {
+        splits.add(this.withStartKey(lastEndKey).withEndKey(range.getEndKey()));
+      }
+
+      List<BigtableSource> ret = splits.build();
+      logger.info("Generated {} splits. First split: {}", ret.size(), ret.get(0));
+      return ret;
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws IOException {
+      // Delegate to testable helper.
+      if (estimatedSizeBytes == null) {
+        estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys());
+      }
+      return estimatedSizeBytes;
+    }
+
+    /**
+     * Computes the estimated size in bytes based on the total size of all samples that overlap
+     * the key range this source will scan.
+     */
+    private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) {
+      long estimatedSizeBytes = 0;
+      long lastOffset = 0;
+      ByteKey currentStartKey = ByteKey.EMPTY;
+      // Compute the total estimated size as the size of each sample that overlaps the scan range.
+      // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a
+      // filter or to sample on a given key range.
+      for (SampleRowKeysResponse response : samples) {
+        ByteKey currentEndKey = ByteKey.of(response.getRowKey());
+        long currentOffset = response.getOffsetBytes();
+        if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) {
+          // Skip an empty region.
+          lastOffset = currentOffset;
+          continue;
+        } else if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) {
+          estimatedSizeBytes += currentOffset - lastOffset;
+        }
+        currentStartKey = currentEndKey;
+        lastOffset = currentOffset;
+      }
+      return estimatedSizeBytes;
+    }
+
+    /**
+     * Cloud Bigtable returns query results ordered by key.
+     */
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+      return true;
+    }
+
+    @Override
+    public BoundedReader<Row> createReader(PipelineOptions options) throws IOException {
+      return new BigtableReader(this, service);
+    }
+
+    @Override
+    public void validate() {
+      checkArgument(!tableId.isEmpty(), "tableId cannot be empty");
+    }
+
+    @Override
+    public Coder<Row> getDefaultOutputCoder() {
+      return ProtoCoder.of(Row.class);
+    }
+
+    /** Helper that splits the specified range in this source into bundles. */
+    private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges(
+        long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) {
+      // Catch the trivial cases. Split is small enough already, or this is the last region.
+      logger.debug(
+          "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}",
+          sampleSizeBytes,
+          desiredBundleSizeBytes);
+      if (sampleSizeBytes <= desiredBundleSizeBytes) {
+        return Collections.singletonList(
+            this.withStartKey(range.getStartKey()).withEndKey(range.getEndKey()));
+      }
+
+      checkArgument(
+          sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes);
+      checkArgument(
+          desiredBundleSizeBytes > 0,
+          "Desired bundle size %s bytes must be greater than 0.",
+          desiredBundleSizeBytes);
+
+      int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / (desiredBundleSizeBytes));
+      List<ByteKey> splitKeys = range.split(splitCount);
+      ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder();
+      Iterator<ByteKey> keys = splitKeys.iterator();
+      ByteKey prev = keys.next();
+      while (keys.hasNext()) {
+        ByteKey next = keys.next();
+        splits.add(
+            this
+                .withStartKey(prev)
+                .withEndKey(next)
+                .withEstimatedSizeBytes(sampleSizeBytes / splitCount));
+        prev = next;
+      }
+      return splits.build();
+    }
+
+    public ByteKeyRange getRange() {
+      return range;
+    }
+
+    public RowFilter getRowFilter() {
+      return filter;
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+  }
+
+  private static class BigtableReader extends BoundedReader<Row> {
+    // Thread-safety: source is protected via synchronization and is only accessed or modified
+    // inside a synchronized block (or constructor, which is the same).
+    private BigtableSource source;
+    private BigtableService service;
+    private BigtableService.Reader reader;
+    private final ByteKeyRangeTracker rangeTracker;
+    private long recordsReturned;
+
+    public BigtableReader(BigtableSource source, BigtableService service) {
+      this.source = source;
+      this.service = service;
+      rangeTracker = ByteKeyRangeTracker.of(source.getRange());
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      reader = service.createReader(getCurrentSource());
+      boolean hasRecord =
+          reader.start()
+              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
+      if (hasRecord) {
+        ++recordsReturned;
+      }
+      return hasRecord;
+    }
+
+    @Override
+    public synchronized BigtableSource getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      boolean hasRecord =
+          reader.advance()
+              && rangeTracker.tryReturnRecordAt(true, ByteKey.of(reader.getCurrentRow().getKey()));
+      if (hasRecord) {
+        ++recordsReturned;
+      }
+      return hasRecord;
+    }
+
+    @Override
+    public Row getCurrent() throws NoSuchElementException {
+      return reader.getCurrentRow();
+    }
+
+    @Override
+    public void close() throws IOException {
+      logger.info("Closing reader after reading {} records.", recordsReturned);
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    }
+
+    @Override
+    public final Double getFractionConsumed() {
+      return rangeTracker.getFractionConsumed();
+    }
+
+    @Override
+    public final synchronized BigtableSource splitAtFraction(double fraction) {
+      ByteKey splitKey;
+      try {
+        splitKey = source.getRange().interpolateKey(fraction);
+      } catch (IllegalArgumentException e) {
+        logger.info("%s: Failed to interpolate key for fraction %s.", source.getRange(), fraction);
+        return null;
+      }
+      logger.debug(
+          "Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey);
+      if (!rangeTracker.trySplitAtPosition(splitKey)) {
+        return null;
+      }
+      BigtableSource primary = source.withEndKey(splitKey);
+      BigtableSource residual = source.withStartKey(splitKey);
+      this.source = primary;
+      return residual;
+    }
+  }
+
+  private static class Sink
+      extends org.apache.beam.sdk.io.Sink<KV<ByteString, Iterable<Mutation>>> {
+
+    public Sink(String tableId, BigtableService bigtableService) {
+      this.tableId = checkNotNull(tableId, "tableId");
+      this.bigtableService = checkNotNull(bigtableService, "bigtableService");
+    }
+
+    public String getTableId() {
+      return tableId;
+    }
+
+    public BigtableService getBigtableService() {
+      return bigtableService;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(Sink.class)
+          .add("bigtableService", bigtableService)
+          .add("tableId", tableId)
+          .toString();
+    }
+
+    ///////////////////////////////////////////////////////////////////////////////
+    private final String tableId;
+    private final BigtableService bigtableService;
+
+    @Override
+    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> createWriteOperation(
+        PipelineOptions options) {
+      return new BigtableWriteOperation(this);
+    }
+
+    /** Does nothing, as it is redundant with {@link Write#validate}. */
+    @Override
+    public void validate(PipelineOptions options) {}
+  }
+
+  private static class BigtableWriteOperation
+      extends WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> {
+    private final Sink sink;
+
+    public BigtableWriteOperation(Sink sink) {
+      this.sink = sink;
+    }
+
+    @Override
+    public Writer<KV<ByteString, Iterable<Mutation>>, Long> createWriter(PipelineOptions options)
+        throws Exception {
+      return new BigtableWriter(this);
+    }
+
+    @Override
+    public void initialize(PipelineOptions options) {}
+
+    @Override
+    public void finalize(Iterable<Long> writerResults, PipelineOptions options) {
+      long count = 0;
+      for (Long value : writerResults) {
+        value += count;
+      }
+      logger.debug("Wrote {} elements to BigtableIO.Sink {}", sink);
+    }
+
+    @Override
+    public Sink getSink() {
+      return sink;
+    }
+
+    @Override
+    public Coder<Long> getWriterResultCoder() {
+      return VarLongCoder.of();
+    }
+  }
+
+  private static class BigtableWriter extends Writer<KV<ByteString, Iterable<Mutation>>, Long> {
+    private final BigtableWriteOperation writeOperation;
+    private final Sink sink;
+    private BigtableService.Writer bigtableWriter;
+    private long recordsWritten;
+    private final ConcurrentLinkedQueue<BigtableWriteException> failures;
+
+    public BigtableWriter(BigtableWriteOperation writeOperation) {
+      this.writeOperation = writeOperation;
+      this.sink = writeOperation.getSink();
+      this.failures = new ConcurrentLinkedQueue<>();
+    }
+
+    @Override
+    public void open(String uId) throws Exception {
+      bigtableWriter = sink.getBigtableService().openForWriting(sink.getTableId());
+      recordsWritten = 0;
+    }
+
+    /**
+     * If any write has asynchronously failed, fail the bundle with a useful error.
+     */
+    private void checkForFailures() throws IOException {
+      // Note that this function is never called by multiple threads and is the only place that
+      // we remove from failures, so this code is safe.
+      if (failures.isEmpty()) {
+        return;
+      }
+
+      StringBuilder logEntry = new StringBuilder();
+      int i = 0;
+      for (; i < 10 && !failures.isEmpty(); ++i) {
+        BigtableWriteException exc = failures.remove();
+        logEntry.append("\n").append(exc.getMessage());
+        if (exc.getCause() != null) {
+          logEntry.append(": ").append(exc.getCause().getMessage());
+        }
+      }
+      String message =
+          String.format(
+              "At least %d errors occurred writing to Bigtable. First %d errors: %s",
+              i + failures.size(),
+              i,
+              logEntry.toString());
+      logger.error(message);
+      throw new IOException(message);
+    }
+
+    @Override
+    public void write(KV<ByteString, Iterable<Mutation>> rowMutations) throws Exception {
+      checkForFailures();
+      Futures.addCallback(
+          bigtableWriter.writeRecord(rowMutations), new WriteExceptionCallback(rowMutations));
+      ++recordsWritten;
+    }
+
+    @Override
+    public Long close() throws Exception {
+      bigtableWriter.close();
+      bigtableWriter = null;
+      checkForFailures();
+      logger.info("Wrote {} records", recordsWritten);
+      return recordsWritten;
+    }
+
+    @Override
+    public WriteOperation<KV<ByteString, Iterable<Mutation>>, Long> getWriteOperation() {
+      return writeOperation;
+    }
+
+    private class WriteExceptionCallback implements FutureCallback<Empty> {
+      private final KV<ByteString, Iterable<Mutation>> value;
+
+      public WriteExceptionCallback(KV<ByteString, Iterable<Mutation>> value) {
+        this.value = value;
+      }
+
+      @Override
+      public void onFailure(Throwable cause) {
+        failures.add(new BigtableWriteException(value, cause));
+      }
+
+      @Override
+      public void onSuccess(Empty produced) {}
+    }
+  }
+
+  /**
+   * An exception that puts information about the failed record being written in its message.
+   */
+  static class BigtableWriteException extends IOException {
+    public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) {
+      super(
+          String.format(
+              "Error mutating row %s with mutations %s",
+              record.getKey().toStringUtf8(),
+              record.getValue()),
+          cause);
+    }
+  }
+
+  /**
+   * A helper function to produce a Cloud Bigtable user agent string.
+   */
+  private static String getUserAgent() {
+    String javaVersion = System.getProperty("java.specification.version");
+    ReleaseInfo info = ReleaseInfo.getReleaseInfo();
+    return String.format(
+        "%s/%s (%s); %s",
+        info.getName(),
+        info.getVersion(),
+        javaVersion,
+        "0.2.3" /* TODO get Bigtable client version directly from jar. */);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
new file mode 100644
index 0000000..93e558b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * An interface for real or fake implementations of Cloud Bigtable.
+ */
+interface BigtableService extends Serializable {
+
+  /**
+   * The interface of a class that can write to Cloud Bigtable.
+   */
+  interface Writer {
+    /**
+     * Writes a single row transaction to Cloud Bigtable. The key of the {@code record} is the
+     * row key to be mutated and the iterable of mutations represent the changes to be made to the
+     * row.
+     *
+     * @throws IOException if there is an error submitting the write.
+     */
+    ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+        throws IOException;
+
+    /**
+     * Closes the writer.
+     *
+     * @throws IOException if any writes did not succeed
+     */
+    void close() throws IOException;
+  }
+
+  /**
+   * The interface of a class that reads from Cloud Bigtable.
+   */
+  interface Reader {
+    /**
+     * Reads the first element (including initialization, such as opening a network connection) and
+     * returns true if an element was found.
+     */
+    boolean start() throws IOException;
+
+    /**
+     * Attempts to read the next element, and returns true if an element has been read.
+     */
+    boolean advance() throws IOException;
+
+    /**
+     * Closes the reader.
+     *
+     * @throws IOException if there is an error.
+     */
+    void close() throws IOException;
+
+    /**
+     * Returns the last row read by a successful start() or advance(), or throws if there is no
+     * current row because the last such call was unsuccessful.
+     */
+    Row getCurrentRow() throws NoSuchElementException;
+  }
+
+  /**
+   * Returns {@code true} if the table with the give name exists.
+   */
+  boolean tableExists(String tableId) throws IOException;
+
+  /**
+   * Returns a {@link Reader} that will read from the specified source.
+   */
+  Reader createReader(BigtableSource source) throws IOException;
+
+  /**
+   * Returns a {@link Writer} that will write to the specified table.
+   */
+  Writer openForWriting(String tableId) throws IOException;
+
+  /**
+   * Returns a set of row keys sampled from the underlying table. These contain information about
+   * the distribution of keys within the table.
+   */
+  List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
new file mode 100644
index 0000000..5933e13
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.values.KV;
+
+import com.google.bigtable.admin.table.v1.GetTableRequest;
+import com.google.bigtable.v1.MutateRowRequest;
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.ReadRowsRequest;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowRange;
+import com.google.bigtable.v1.SampleRowKeysRequest;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.cloud.bigtable.grpc.BigtableSession;
+import com.google.cloud.bigtable.grpc.async.AsyncExecutor;
+import com.google.cloud.bigtable.grpc.async.HeapSizeManager;
+import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
+import com.google.common.base.MoreObjects;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import io.grpc.Status.Code;
+import io.grpc.StatusRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable
+ * service.
+ */
+class BigtableServiceImpl implements BigtableService {
+  private static final Logger logger = LoggerFactory.getLogger(BigtableService.class);
+
+  public BigtableServiceImpl(BigtableOptions options) {
+    this.options = options;
+  }
+
+  private final BigtableOptions options;
+
+  @Override
+  public BigtableWriterImpl openForWriting(String tableId) throws IOException {
+    BigtableSession session = new BigtableSession(options);
+    String tableName = options.getClusterName().toTableNameStr(tableId);
+    return new BigtableWriterImpl(session, tableName);
+  }
+
+  @Override
+  public boolean tableExists(String tableId) throws IOException {
+    if (!BigtableSession.isAlpnProviderEnabled()) {
+      logger.info(
+          "Skipping existence check for table {} (BigtableOptions {}) because ALPN is not"
+              + " configured.",
+          tableId,
+          options);
+      return true;
+    }
+
+    try (BigtableSession session = new BigtableSession(options)) {
+      GetTableRequest getTable =
+          GetTableRequest.newBuilder()
+              .setName(options.getClusterName().toTableNameStr(tableId))
+              .build();
+      session.getTableAdminClient().getTable(getTable);
+      return true;
+    } catch (StatusRuntimeException e) {
+      if (e.getStatus().getCode() == Code.NOT_FOUND) {
+        return false;
+      }
+      String message =
+          String.format(
+              "Error checking whether table %s (BigtableOptions %s) exists", tableId, options);
+      logger.error(message, e);
+      throw new IOException(message, e);
+    }
+  }
+
+  private class BigtableReaderImpl implements Reader {
+    private BigtableSession session;
+    private final BigtableSource source;
+    private ResultScanner<Row> results;
+    private Row currentRow;
+
+    public BigtableReaderImpl(BigtableSession session, BigtableSource source) {
+      this.session = session;
+      this.source = source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      RowRange range =
+          RowRange.newBuilder()
+              .setStartKey(source.getRange().getStartKey().getValue())
+              .setEndKey(source.getRange().getEndKey().getValue())
+              .build();
+      ReadRowsRequest.Builder requestB =
+          ReadRowsRequest.newBuilder()
+              .setRowRange(range)
+              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()));
+      if (source.getRowFilter() != null) {
+        requestB.setFilter(source.getRowFilter());
+      }
+      results = session.getDataClient().readRows(requestB.build());
+      return advance();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      currentRow = results.next();
+      return (currentRow != null);
+    }
+
+    @Override
+    public void close() throws IOException {
+      // Goal: by the end of this function, both results and session are null and closed,
+      // independent of what errors they throw or prior state.
+
+      if (session == null) {
+        // Only possible when previously closed, so we know that results is also null.
+        return;
+      }
+
+      // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with
+      // the Closer, but we can use the Closer to simplify the error handling.
+      try (Closer closer = Closer.create()) {
+        if (results != null) {
+          closer.register(results);
+          results = null;
+        }
+
+        session.close();
+      } finally {
+        session = null;
+      }
+    }
+
+    @Override
+    public Row getCurrentRow() throws NoSuchElementException {
+      if (currentRow == null) {
+        throw new NoSuchElementException();
+      }
+      return currentRow;
+    }
+  }
+
+  private static class BigtableWriterImpl implements Writer {
+    private BigtableSession session;
+    private AsyncExecutor executor;
+    private final MutateRowRequest.Builder partialBuilder;
+
+    public BigtableWriterImpl(BigtableSession session, String tableName) {
+      this.session = session;
+      this.executor =
+          new AsyncExecutor(
+              session.getDataClient(),
+              new HeapSizeManager(
+                  AsyncExecutor.ASYNC_MUTATOR_MAX_MEMORY_DEFAULT,
+                  AsyncExecutor.MAX_INFLIGHT_RPCS_DEFAULT));
+
+      partialBuilder = MutateRowRequest.newBuilder().setTableName(tableName);
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        if (executor != null) {
+          executor.flush();
+          executor = null;
+        }
+      } finally {
+        if (session != null) {
+          session.close();
+          session = null;
+        }
+      }
+    }
+
+    @Override
+    public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record)
+        throws IOException {
+      MutateRowRequest r =
+          partialBuilder
+              .clone()
+              .setRowKey(record.getKey())
+              .addAllMutations(record.getValue())
+              .build();
+      try {
+        return executor.mutateRowAsync(r);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Write interrupted", e);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects
+        .toStringHelper(BigtableServiceImpl.class)
+        .add("options", options)
+        .toString();
+  }
+
+  @Override
+  public Reader createReader(BigtableSource source) throws IOException {
+    BigtableSession session = new BigtableSession(options);
+    return new BigtableReaderImpl(session, source);
+  }
+
+  @Override
+  public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException {
+    try (BigtableSession session = new BigtableSession(options)) {
+      SampleRowKeysRequest request =
+          SampleRowKeysRequest.newBuilder()
+              .setTableName(options.getClusterName().toTableNameStr(source.getTableId()))
+              .build();
+      return session.getDataClient().sampleRowKeys(request);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java
new file mode 100644
index 0000000..c4c8c04
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines transforms for reading and writing from Google Cloud Bigtable.
+ *
+ * @see org.apache.beam.sdk.io.gcp.bigtable.BigtableIO
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
new file mode 100644
index 0000000..403ad9d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -0,0 +1,729 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigtable;
+
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
+import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
+import static org.apache.beam.sdk.testing.SourceTestUtils
+    .assertSplitAtFractionSucceedsAndConsistent;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
+
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.bigtable.v1.Cell;
+import com.google.bigtable.v1.Column;
+import com.google.bigtable.v1.Family;
+import com.google.bigtable.v1.Mutation;
+import com.google.bigtable.v1.Mutation.SetCell;
+import com.google.bigtable.v1.Row;
+import com.google.bigtable.v1.RowFilter;
+import com.google.bigtable.v1.SampleRowKeysResponse;
+import com.google.cloud.bigtable.config.BigtableOptions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Empty;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * Unit tests for {@link BigtableIO}.
+ */
+@RunWith(JUnit4.class)
+public class BigtableIOTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public ExpectedLogs logged = ExpectedLogs.none(BigtableIO.class);
+
+  /**
+   * These tests requires a static instance of the {@link FakeBigtableService} because the writers
+   * go through a serialization step when executing the test and would not affect passed-in objects
+   * otherwise.
+   */
+  private static FakeBigtableService service;
+  private static final BigtableOptions BIGTABLE_OPTIONS =
+      new BigtableOptions.Builder()
+          .setProjectId("project")
+          .setClusterId("cluster")
+          .setZoneId("zone")
+          .build();
+  private static BigtableIO.Read defaultRead =
+      BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
+  private static BigtableIO.Write defaultWrite =
+      BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
+  private Coder<KV<ByteString, Iterable<Mutation>>> bigtableCoder;
+  private static final TypeDescriptor<KV<ByteString, Iterable<Mutation>>> BIGTABLE_WRITE_TYPE =
+      new TypeDescriptor<KV<ByteString, Iterable<Mutation>>>() {};
+
+  @Before
+  public void setup() throws Exception {
+    service = new FakeBigtableService();
+    defaultRead = defaultRead.withBigtableService(service);
+    defaultWrite = defaultWrite.withBigtableService(service);
+    bigtableCoder = TestPipeline.create().getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
+  }
+
+  @Test
+  public void testReadBuildsCorrectly() {
+    BigtableIO.Read read =
+        BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
+    assertEquals("project", read.getBigtableOptions().getProjectId());
+    assertEquals("cluster", read.getBigtableOptions().getClusterId());
+    assertEquals("zone", read.getBigtableOptions().getZoneId());
+    assertEquals("table", read.getTableId());
+  }
+
+  @Test
+  public void testReadBuildsCorrectlyInDifferentOrder() {
+    BigtableIO.Read read =
+        BigtableIO.read().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
+    assertEquals("project", read.getBigtableOptions().getProjectId());
+    assertEquals("cluster", read.getBigtableOptions().getClusterId());
+    assertEquals("zone", read.getBigtableOptions().getZoneId());
+    assertEquals("table", read.getTableId());
+  }
+
+  @Test
+  public void testWriteBuildsCorrectly() {
+    BigtableIO.Write write =
+        BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS).withTableId("table");
+    assertEquals("table", write.getTableId());
+    assertEquals("project", write.getBigtableOptions().getProjectId());
+    assertEquals("zone", write.getBigtableOptions().getZoneId());
+    assertEquals("cluster", write.getBigtableOptions().getClusterId());
+  }
+
+  @Test
+  public void testWriteBuildsCorrectlyInDifferentOrder() {
+    BigtableIO.Write write =
+        BigtableIO.write().withTableId("table").withBigtableOptions(BIGTABLE_OPTIONS);
+    assertEquals("cluster", write.getBigtableOptions().getClusterId());
+    assertEquals("project", write.getBigtableOptions().getProjectId());
+    assertEquals("zone", write.getBigtableOptions().getZoneId());
+    assertEquals("table", write.getTableId());
+  }
+
+  @Test
+  public void testWriteValidationFailsMissingTable() {
+    BigtableIO.Write write = BigtableIO.write().withBigtableOptions(BIGTABLE_OPTIONS);
+
+    thrown.expect(IllegalArgumentException.class);
+
+    write.validate(null);
+  }
+
+  @Test
+  public void testWriteValidationFailsMissingOptions() {
+    BigtableIO.Write write = BigtableIO.write().withTableId("table");
+
+    thrown.expect(IllegalArgumentException.class);
+
+    write.validate(null);
+  }
+
+  /** Helper function to make a single row mutation to be written. */
+  private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) {
+    ByteString rowKey = ByteString.copyFromUtf8(key);
+    Iterable<Mutation> mutations =
+        ImmutableList.of(
+            Mutation.newBuilder()
+                .setSetCell(SetCell.newBuilder().setValue(ByteString.copyFromUtf8(value)))
+                .build());
+    return KV.of(rowKey, mutations);
+  }
+
+  /** Helper function to make a single bad row mutation (no set cell). */
+  private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) {
+    Iterable<Mutation> mutations = ImmutableList.of(Mutation.newBuilder().build());
+    return KV.of(ByteString.copyFromUtf8(key), mutations);
+  }
+
+  /** Tests that when reading from a non-existent table, the read fails. */
+  @Test
+  public void testReadingFailsTableDoesNotExist() throws Exception {
+    final String table = "TEST-TABLE";
+
+    BigtableIO.Read read =
+        BigtableIO.read()
+            .withBigtableOptions(BIGTABLE_OPTIONS)
+            .withTableId(table)
+            .withBigtableService(service);
+
+    // Exception will be thrown by read.validate() when read is applied.
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(String.format("Table %s does not exist", table));
+
+    TestPipeline.create().apply(read);
+  }
+
+  /** Tests that when reading from an empty table, the read succeeds. */
+  @Test
+  public void testReadingEmptyTable() throws Exception {
+    final String table = "TEST-EMPTY-TABLE";
+    service.createTable(table);
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
+    PAssert.that(rows).empty();
+
+    p.run();
+    logged.verifyInfo(String.format("Closing reader after reading 0 records."));
+  }
+
+  /** Tests reading all rows from a table. */
+  @Test
+  public void testReading() throws Exception {
+    final String table = "TEST-MANY-ROWS-TABLE";
+    final int numRows = 1001;
+    List<Row> testRows = makeTableData(table, numRows);
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
+    PAssert.that(rows).containsInAnyOrder(testRows);
+
+    p.run();
+    logged.verifyInfo(String.format("Closing reader after reading %d records.", numRows));
+  }
+
+  /** A {@link Predicate} that a {@link Row Row's} key matches the given regex. */
+  private static class KeyMatchesRegex implements Predicate<ByteString> {
+    private final String regex;
+
+    public KeyMatchesRegex(String regex) {
+      this.regex = regex;
+    }
+
+    @Override
+    public boolean apply(@Nullable ByteString input) {
+      verifyNotNull(input, "input");
+      return input.toStringUtf8().matches(regex);
+    }
+  }
+
+  /** Tests reading all rows using a filter. */
+  @Test
+  public void testReadingWithFilter() throws Exception {
+    final String table = "TEST-FILTER-TABLE";
+    final int numRows = 1001;
+    List<Row> testRows = makeTableData(table, numRows);
+    String regex = ".*17.*";
+    final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
+    Iterable<Row> filteredRows =
+        Iterables.filter(
+            testRows,
+            new Predicate<Row>() {
+              @Override
+              public boolean apply(@Nullable Row input) {
+                verifyNotNull(input, "input");
+                return keyPredicate.apply(input.getKey());
+              }
+            });
+
+    RowFilter filter =
+        RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build();
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<Row> rows = p.apply(defaultRead.withTableId(table).withRowFilter(filter));
+    PAssert.that(rows).containsInAnyOrder(filteredRows);
+
+    p.run();
+  }
+
+  /**
+   * Tests dynamic work rebalancing exhaustively.
+   *
+   * <p>Because this test runs so slowly, it is disabled by default. Re-run when changing the
+   * {@link BigtableIO.Read} implementation.
+   */
+  @Ignore("Slow. Rerun when changing the implementation.")
+  @Test
+  public void testReadingSplitAtFractionExhaustive() throws Exception {
+    final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";
+    final int numRows = 10;
+    final int numSamples = 1;
+    final long bytesPerRow = 1L;
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    BigtableSource source =
+        new BigtableSource(service, table, null, service.getTableRange(table), null);
+    assertSplitAtFractionExhaustive(source, null);
+  }
+
+  /**
+   * Unit tests of splitAtFraction.
+   */
+  @Test
+  public void testReadingSplitAtFraction() throws Exception {
+    final String table = "TEST-SPLIT-AT-FRACTION";
+    final int numRows = 10;
+    final int numSamples = 1;
+    final long bytesPerRow = 1L;
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    BigtableSource source =
+        new BigtableSource(service, table, null, service.getTableRange(table), null);
+    // With 0 items read, all split requests will fail.
+    assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
+    assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
+    // With 1 items read, all split requests past 1/10th will succeed.
+    assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.333, null /* options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */);
+    // With 3 items read, all split requests past 3/10ths will succeed.
+    assertSplitAtFractionFails(source, 3, 0.2, null /* options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.571, null /* options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.9, null /* options */);
+    // With 6 items read, all split requests past 6/10ths will succeed.
+    assertSplitAtFractionFails(source, 6, 0.5, null /* options */);
+    assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */);
+  }
+
+  /** Tests reading all rows from a split table. */
+  @Test
+  public void testReadingWithSplits() throws Exception {
+    final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+    final int numRows = 1500;
+    final int numSamples = 10;
+    final long bytesPerRow = 100L;
+
+    // Set up test table data and sample row keys for size estimation and splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    // Generate source and split it.
+    BigtableSource source =
+        new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
+    List<BigtableSource> splits =
+        source.splitIntoBundles(numRows * bytesPerRow / numSamples, null /* options */);
+
+    // Test num splits and split equality.
+    assertThat(splits, hasSize(numSamples));
+    assertSourcesEqualReferenceSource(source, splits, null /* options */);
+  }
+
+  /** Tests reading all rows from a sub-split table. */
+  @Test
+  public void testReadingWithSubSplits() throws Exception {
+    final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
+    final int numRows = 1000;
+    final int numSamples = 10;
+    final int numSplits = 20;
+    final long bytesPerRow = 100L;
+
+    // Set up test table data and sample row keys for size estimation and splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    // Generate source and split it.
+    BigtableSource source =
+        new BigtableSource(service, table, null /*filter*/, ByteKeyRange.ALL_KEYS, null /*size*/);
+    List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
+
+    // Test num splits and split equality.
+    assertThat(splits, hasSize(numSplits));
+    assertSourcesEqualReferenceSource(source, splits, null /* options */);
+  }
+
+  /** Tests reading all rows from a sub-split table. */
+  @Test
+  public void testReadingWithFilterAndSubSplits() throws Exception {
+    final String table = "TEST-FILTER-SUB-SPLITS";
+    final int numRows = 1700;
+    final int numSamples = 10;
+    final int numSplits = 20;
+    final long bytesPerRow = 100L;
+
+    // Set up test table data and sample row keys for size estimation and splitting.
+    makeTableData(table, numRows);
+    service.setupSampleRowKeys(table, numSamples, bytesPerRow);
+
+    // Generate source and split it.
+    RowFilter filter =
+        RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
+    BigtableSource source =
+        new BigtableSource(service, table, filter, ByteKeyRange.ALL_KEYS, null /*size*/);
+    List<BigtableSource> splits = source.splitIntoBundles(numRows * bytesPerRow / numSplits, null);
+
+    // Test num splits and split equality.
+    assertThat(splits, hasSize(numSplits));
+    assertSourcesEqualReferenceSource(source, splits, null /* options */);
+  }
+
+  @Test
+  public void testReadingDisplayData() {
+    RowFilter rowFilter = RowFilter.newBuilder()
+        .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*"))
+        .build();
+
+    BigtableIO.Read read = BigtableIO.read()
+        .withBigtableOptions(BIGTABLE_OPTIONS)
+        .withTableId("fooTable")
+        .withRowFilter(rowFilter);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+    assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
+
+    // BigtableIO adds user-agent to options; assert only on key and not value.
+    assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+  }
+
+  /** Tests that a record gets written to the service and messages are logged. */
+  @Test
+  public void testWriting() throws Exception {
+    final String table = "table";
+    final String key = "key";
+    final String value = "value";
+
+    service.createTable(table);
+
+    TestPipeline p = TestPipeline.create();
+    p.apply("single row", Create.of(makeWrite(key, value)).withCoder(bigtableCoder))
+        .apply("write", defaultWrite.withTableId(table));
+    p.run();
+
+    logged.verifyInfo("Wrote 1 records");
+
+    assertEquals(1, service.tables.size());
+    assertNotNull(service.getTable(table));
+    Map<ByteString, ByteString> rows = service.getTable(table);
+    assertEquals(1, rows.size());
+    assertEquals(ByteString.copyFromUtf8(value), rows.get(ByteString.copyFromUtf8(key)));
+  }
+
+  /** Tests that when writing to a non-existent table, the write fails. */
+  @Test
+  public void testWritingFailsTableDoesNotExist() throws Exception {
+    final String table = "TEST-TABLE";
+
+    PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
+        TestPipeline.create().apply(Create.<KV<ByteString, Iterable<Mutation>>>of());
+
+    // Exception will be thrown by write.validate() when write is applied.
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(String.format("Table %s does not exist", table));
+
+    emptyInput.apply("write", defaultWrite.withTableId(table));
+  }
+
+  /** Tests that when writing an element fails, the write fails. */
+  @Test
+  public void testWritingFailsBadElement() throws Exception {
+    final String table = "TEST-TABLE";
+    final String key = "KEY";
+    service.createTable(table);
+
+    TestPipeline p = TestPipeline.create();
+    p.apply(Create.of(makeBadWrite(key)).withCoder(bigtableCoder))
+        .apply(defaultWrite.withTableId(table));
+
+    thrown.expect(PipelineExecutionException.class);
+    thrown.expectCause(Matchers.<Throwable>instanceOf(IOException.class));
+    thrown.expectMessage("At least 1 errors occurred writing to Bigtable. First 1 errors:");
+    thrown.expectMessage("Error mutating row " + key + " with mutations []: cell value missing");
+    p.run();
+  }
+
+  @Test
+  public void testWritingDisplayData() {
+    BigtableIO.Write write = BigtableIO.write()
+        .withTableId("fooTable")
+        .withBigtableOptions(BIGTABLE_OPTIONS);
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+    // BigtableIO adds user-agent to options; assert only on key and not value.
+    assertThat(displayData, hasDisplayItem(hasKey("bigtableOptions")));
+  }
+
+  ////////////////////////////////////////////////////////////////////////////////////////////
+  private static final String COLUMN_FAMILY_NAME = "family";
+  private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");
+  private static final Column TEST_COLUMN = Column.newBuilder().setQualifier(COLUMN_NAME).build();
+  private static final Family TEST_FAMILY = Family.newBuilder().setName(COLUMN_FAMILY_NAME).build();
+
+  /** Helper function that builds a {@link Row} in a test table that could be returned by read. */
+  private static Row makeRow(ByteString key, ByteString value) {
+    // Build the currentRow and return true.
+    Column.Builder newColumn = TEST_COLUMN.toBuilder().addCells(Cell.newBuilder().setValue(value));
+    return Row.newBuilder()
+        .setKey(key)
+        .addFamilies(TEST_FAMILY.toBuilder().addColumns(newColumn))
+        .build();
+  }
+
+  /** Helper function to create a table and return the rows that it created. */
+  private static List<Row> makeTableData(String tableId, int numRows) {
+    service.createTable(tableId);
+    Map<ByteString, ByteString> testData = service.getTable(tableId);
+
+    List<Row> testRows = new ArrayList<>(numRows);
+    for (int i = 0; i < numRows; ++i) {
+      ByteString key = ByteString.copyFromUtf8(String.format("key%09d", i));
+      ByteString value = ByteString.copyFromUtf8(String.format("value%09d", i));
+      testData.put(key, value);
+      testRows.add(makeRow(key, value));
+    }
+
+    return testRows;
+  }
+
+
+  /**
+   * A {@link BigtableService} implementation that stores tables and their contents in memory.
+   */
+  private static class FakeBigtableService implements BigtableService {
+    private final Map<String, SortedMap<ByteString, ByteString>> tables = new HashMap<>();
+    private final Map<String, List<SampleRowKeysResponse>> sampleRowKeys = new HashMap<>();
+
+    @Nullable
+    public SortedMap<ByteString, ByteString> getTable(String tableId) {
+      return tables.get(tableId);
+    }
+
+    public ByteKeyRange getTableRange(String tableId) {
+      verifyTableExists(tableId);
+      SortedMap<ByteString, ByteString> data = tables.get(tableId);
+      return ByteKeyRange.of(ByteKey.of(data.firstKey()), ByteKey.of(data.lastKey()));
+    }
+
+    public void createTable(String tableId) {
+      tables.put(tableId, new TreeMap<ByteString, ByteString>(new ByteStringComparator()));
+    }
+
+    @Override
+    public boolean tableExists(String tableId) {
+      return tables.containsKey(tableId);
+    }
+
+    public void verifyTableExists(String tableId) {
+      checkArgument(tableExists(tableId), "Table %s does not exist", tableId);
+    }
+
+    @Override
+    public FakeBigtableReader createReader(BigtableSource source) {
+      return new FakeBigtableReader(source);
+    }
+
+    @Override
+    public FakeBigtableWriter openForWriting(String tableId) {
+      return new FakeBigtableWriter(tableId);
+    }
+
+    @Override
+    public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) {
+      List<SampleRowKeysResponse> samples = sampleRowKeys.get(source.getTableId());
+      checkArgument(samples != null, "No samples found for table %s", source.getTableId());
+      return samples;
+    }
+
+    /** Sets up the sample row keys for the specified table. */
+    void setupSampleRowKeys(String tableId, int numSamples, long bytesPerRow) {
+      verifyTableExists(tableId);
+      checkArgument(numSamples > 0, "Number of samples must be positive: %s", numSamples);
+      checkArgument(bytesPerRow > 0, "Bytes/Row must be positive: %s", bytesPerRow);
+
+      ImmutableList.Builder<SampleRowKeysResponse> ret = ImmutableList.builder();
+      SortedMap<ByteString, ByteString> rows = getTable(tableId);
+      int currentSample = 1;
+      int rowsSoFar = 0;
+      for (Map.Entry<ByteString, ByteString> entry : rows.entrySet()) {
+        if (((double) rowsSoFar) / rows.size() >= ((double) currentSample) / numSamples) {
+          // add the sample with the total number of bytes in the table before this key.
+          ret.add(
+              SampleRowKeysResponse.newBuilder()
+                  .setRowKey(entry.getKey())
+                  .setOffsetBytes(rowsSoFar * bytesPerRow)
+                  .build());
+          // Move on to next sample
+          currentSample++;
+        }
+        ++rowsSoFar;
+      }
+
+      // Add the last sample indicating the end of the table, with all rows before it.
+      ret.add(SampleRowKeysResponse.newBuilder().setOffsetBytes(rows.size() * bytesPerRow).build());
+      sampleRowKeys.put(tableId, ret.build());
+    }
+  }
+
+  /**
+   * A {@link BigtableService.Reader} implementation that reads from the static instance of
+   * {@link FakeBigtableService} stored in {@link #service}.
+   *
+   * <p>This reader does not support {@link RowFilter} objects.
+   */
+  private static class FakeBigtableReader implements BigtableService.Reader {
+    private final BigtableSource source;
+    private Iterator<Map.Entry<ByteString, ByteString>> rows;
+    private Row currentRow;
+    private Predicate<ByteString> filter;
+
+    public FakeBigtableReader(BigtableSource source) {
+      this.source = source;
+      if (source.getRowFilter() == null) {
+        filter = Predicates.alwaysTrue();
+      } else {
+        ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter();
+        checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
+        filter = new KeyMatchesRegex(keyRegex.toStringUtf8());
+      }
+      service.verifyTableExists(source.getTableId());
+    }
+
+    @Override
+    public boolean start() {
+      rows = service.tables.get(source.getTableId()).entrySet().iterator();
+      return advance();
+    }
+
+    @Override
+    public boolean advance() {
+      // Loop until we find a row in range, or reach the end of the iterator.
+      Map.Entry<ByteString, ByteString> entry = null;
+      while (rows.hasNext()) {
+        entry = rows.next();
+        if (!filter.apply(entry.getKey())
+            || !source.getRange().containsKey(ByteKey.of(entry.getKey()))) {
+          // Does not match row filter or does not match source range. Skip.
+          entry = null;
+          continue;
+        }
+        // Found a row inside this source's key range, stop.
+        break;
+      }
+
+      // Return false if no more rows.
+      if (entry == null) {
+        currentRow = null;
+        return false;
+      }
+
+      // Set the current row and return true.
+      currentRow = makeRow(entry.getKey(), entry.getValue());
+      return true;
+    }
+
+    @Override
+    public Row getCurrentRow() {
+      if (currentRow == null) {
+        throw new NoSuchElementException();
+      }
+      return currentRow;
+    }
+
+    @Override
+    public void close() {
+      rows = null;
+      currentRow = null;
+    }
+  }
+
+  /**
+   * A {@link BigtableService.Writer} implementation that writes to the static instance of
+   * {@link FakeBigtableService} stored in {@link #service}.
+   *
+   * <p>This writer only supports {@link Mutation Mutations} that consist only of {@link SetCell}
+   * entries. The column family in the {@link SetCell} is ignored; only the value is used.
+   *
+   * <p>When no {@link SetCell} is provided, the write will fail and this will be exposed via an
+   * exception on the returned {@link ListenableFuture}.
+   */
+  private static class FakeBigtableWriter implements BigtableService.Writer {
+    private final String tableId;
+
+    public FakeBigtableWriter(String tableId) {
+      this.tableId = tableId;
+    }
+
+    @Override
+    public ListenableFuture<Empty> writeRecord(KV<ByteString, Iterable<Mutation>> record) {
+      service.verifyTableExists(tableId);
+      Map<ByteString, ByteString> table = service.getTable(tableId);
+      ByteString key = record.getKey();
+      for (Mutation m : record.getValue()) {
+        SetCell cell = m.getSetCell();
+        if (cell.getValue().isEmpty()) {
+          return Futures.immediateFailedCheckedFuture(new IOException("cell value missing"));
+        }
+        table.put(key, cell.getValue());
+      }
+      return Futures.immediateFuture(Empty.getDefaultInstance());
+    }
+
+    @Override
+    public void close() {}
+  }
+
+  /** A serializable comparator for ByteString. Used to make row samples. */
+  private static final class ByteStringComparator implements Comparator<ByteString>, Serializable {
+    @Override
+    public int compare(ByteString o1, ByteString o2) {
+      return ByteKey.of(o1).compareTo(ByteKey.of(o2));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d138ae54/sdks/java/io/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/pom.xml b/sdks/java/io/pom.xml
index 95d1f55..0027e0e 100644
--- a/sdks/java/io/pom.xml
+++ b/sdks/java/io/pom.xml
@@ -35,6 +35,7 @@
   (sources and sinks) to consume and produce data from systems.</description>
 
   <modules>
+    <module>google-cloud-platform</module>
     <module>hdfs</module>
     <module>kafka</module>
   </modules>


Mime
View raw message