beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] beam git commit: BigtableIO should use AutoValue for read and write
Date Fri, 12 May 2017 18:02:34 GMT
Repository: beam
Updated Branches:
  refs/heads/master 13c06bf79 -> 42066431b


BigtableIO should use AutoValue for read and write


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e8d82981
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e8d82981
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e8d82981

Branch: refs/heads/master
Commit: e8d829817e3f7e07884a91737b61bfa5133d5724
Parents: 13c06bf
Author: Borisa Zivkovic <borisa.zivkovic@huawei.com>
Authored: Fri May 12 08:44:20 2017 +0100
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Fri May 12 10:56:41 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigtable/BigtableIO.java    | 237 +++++++++----------
 1 file changed, 114 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e8d82981/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index bde7ea5..22e9f36 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.auto.value.AutoValue;
 import com.google.bigtable.v2.MutateRowResponse;
 import com.google.bigtable.v2.Mutation;
 import com.google.bigtable.v2.Row;
@@ -164,7 +165,8 @@ public class BigtableIO {
    */
   @Experimental
   public static Read read() {
-    return new Read(null, "", ByteKeyRange.ALL_KEYS, null, null);
+    return new AutoValue_BigtableIO_Read.Builder().setKeyRange(ByteKeyRange.ALL_KEYS).setTableId("")
+        .build();
   }
 
   /**
@@ -176,7 +178,7 @@ public class BigtableIO {
    */
   @Experimental
   public static Write write() {
-    return new Write(null, "", null);
+    return new AutoValue_BigtableIO_Write.Builder().setTableId("").build();
   }
 
   /**
@@ -186,7 +188,46 @@ public class BigtableIO {
    * @see BigtableIO
    */
   @Experimental
-  public static class Read extends PTransform<PBegin, PCollection<Row>> {
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<Row>>
{
+
+    @Nullable
+    abstract RowFilter getRowFilter();
+
+    /** Returns the range of keys that will be read from the table. */
+    @Nullable
+    public abstract ByteKeyRange getKeyRange();
+
+    /** Returns the table being read from. */
+    @Nullable
+    public abstract String getTableId();
+
+    @Nullable
+    abstract BigtableService getBigtableService();
+
+
+    /** Returns the Google Cloud Bigtable instance being read from, and other parameters.
*/
+    @Nullable
+    public abstract BigtableOptions getBigtableOptions();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setRowFilter(RowFilter filter);
+
+      abstract Builder setKeyRange(ByteKeyRange keyRange);
+
+      abstract Builder setTableId(String tableId);
+
+      abstract Builder setBigtableOptions(BigtableOptions options);
+
+      abstract Builder setBigtableService(BigtableService bigtableService);
+
+      abstract Read build();
+    }
+
     /**
      * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance
      * indicated by the given options, and using any other specified customizations.
@@ -217,7 +258,7 @@ public class BigtableIO {
       BigtableOptions optionsWithAgent =
           clonedBuilder.setUserAgent(getBeamSdkPartOfUserAgent()).build();
 
-      return new Read(optionsWithAgent, tableId, keyRange, filter, bigtableService);
+      return toBuilder().setBigtableOptions(optionsWithAgent).build();
     }
 
     /**
@@ -228,7 +269,7 @@ public class BigtableIO {
      */
     public Read withRowFilter(RowFilter filter) {
       checkNotNull(filter, "filter");
-      return new Read(options, tableId, keyRange, filter, bigtableService);
+      return toBuilder().setRowFilter(filter).build();
     }
 
     /**
@@ -238,7 +279,7 @@ public class BigtableIO {
      */
     public Read withKeyRange(ByteKeyRange keyRange) {
       checkNotNull(keyRange, "keyRange");
-      return new Read(options, tableId, keyRange, filter, bigtableService);
+      return toBuilder().setKeyRange(keyRange).build();
     }
 
     /**
@@ -248,29 +289,7 @@ public class BigtableIO {
      */
     public Read withTableId(String tableId) {
       checkNotNull(tableId, "tableId");
-      return new Read(options, tableId, keyRange, filter, bigtableService);
-    }
-
-    /**
-     * Returns the Google Cloud Bigtable instance being read from, and other parameters.
-     */
-    public BigtableOptions getBigtableOptions() {
-      return options;
-    }
-
-    /**
-     * Returns the range of keys that will be read from the table. By default, returns
-     * {@link ByteKeyRange#ALL_KEYS} to scan the entire table.
-     */
-    public ByteKeyRange getKeyRange() {
-      return keyRange;
-    }
-
-    /**
-     * Returns the table being read from.
-     */
-    public String getTableId() {
-      return tableId;
+      return toBuilder().setTableId(tableId).build();
     }
 
     @Override
@@ -281,21 +300,21 @@ public class BigtableIO {
             public BigtableService apply(PipelineOptions options) {
               return getBigtableService(options);
             }
-          }, tableId, filter, keyRange, null);
+          }, getTableId(), getRowFilter(), getKeyRange(), null);
       return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
     }
 
     @Override
     public void validate(PipelineOptions options) {
-      checkArgument(this.options != null, "BigtableOptions not specified");
-      checkArgument(!tableId.isEmpty(), "Table ID not specified");
+      checkArgument(getBigtableOptions() != null, "BigtableOptions not specified");
+      checkArgument(getTableId() != null && !getTableId().isEmpty(), "Table ID not
specified");
       try {
         checkArgument(
-            getBigtableService(options).tableExists(tableId),
+            getBigtableService(options).tableExists(getTableId()),
             "Table %s does not exist",
-            tableId);
+            getTableId());
       } catch (IOException e) {
-        LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+        LOG.warn("Error checking whether table {} exists; proceeding.", getTableId(), e);
       }
     }
 
@@ -303,19 +322,19 @@ public class BigtableIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      builder.add(DisplayData.item("tableId", tableId)
+      builder.add(DisplayData.item("tableId", getTableId())
         .withLabel("Table ID"));
 
-      if (options != null) {
-        builder.add(DisplayData.item("bigtableOptions", options.toString())
+      if (getBigtableOptions() != null) {
+        builder.add(DisplayData.item("bigtableOptions", getBigtableOptions().toString())
           .withLabel("Bigtable Options"));
       }
 
       builder.addIfNotDefault(
-          DisplayData.item("keyRange", keyRange.toString()), ByteKeyRange.ALL_KEYS.toString());
+          DisplayData.item("keyRange", getKeyRange().toString()), ByteKeyRange.ALL_KEYS.toString());
 
-      if (filter != null) {
-        builder.add(DisplayData.item("rowFilter", filter.toString())
+      if (getRowFilter() != null) {
+        builder.add(DisplayData.item("rowFilter", getRowFilter().toString())
           .withLabel("Table Row Filter"));
       }
     }
@@ -323,38 +342,13 @@ public class BigtableIO {
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(Read.class)
-          .add("options", options)
-          .add("tableId", tableId)
-          .add("keyRange", keyRange)
-          .add("filter", filter)
+          .add("options", getBigtableOptions())
+          .add("tableId", getTableId())
+          .add("keyRange", getKeyRange())
+          .add("filter", getRowFilter())
           .toString();
     }
 
-    /////////////////////////////////////////////////////////////////////////////////////////
-    /**
-     * Used to define the Cloud Bigtable instance and any options for the networking layer.
-     * Cannot actually be {@code null} at validation time, but may start out {@code null}
while
-     * source is being built.
-     */
-    @Nullable private final BigtableOptions options;
-    private final String tableId;
-    private final ByteKeyRange keyRange;
-    @Nullable private final RowFilter filter;
-    @Nullable private final BigtableService bigtableService;
-
-    private Read(
-        @Nullable BigtableOptions options,
-        String tableId,
-        ByteKeyRange keyRange,
-        @Nullable RowFilter filter,
-        @Nullable BigtableService bigtableService) {
-      this.options = options;
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.keyRange = checkNotNull(keyRange, "keyRange");
-      this.filter = filter;
-      this.bigtableService = bigtableService;
-    }
-
     /**
      * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable
      * service implementation.
@@ -365,7 +359,7 @@ public class BigtableIO {
      */
     Read withBigtableService(BigtableService bigtableService) {
       checkNotNull(bigtableService, "bigtableService");
-      return new Read(options, tableId, keyRange, filter, bigtableService);
+      return toBuilder().setBigtableService(bigtableService).build();
     }
 
     /**
@@ -378,11 +372,12 @@ public class BigtableIO {
      */
     @VisibleForTesting
     BigtableService getBigtableService(PipelineOptions pipelineOptions) {
-      if (bigtableService != null) {
-        return bigtableService;
+      if (getBigtableService() != null) {
+        return getBigtableService();
       }
-      BigtableOptions.Builder clonedOptions = options.toBuilder();
-      if (options.getCredentialOptions().getCredentialType() == CredentialType.DefaultCredentials)
{
+      BigtableOptions.Builder clonedOptions = getBigtableOptions().toBuilder();
+      if (getBigtableOptions().getCredentialOptions()
+          .getCredentialType() == CredentialType.DefaultCredentials) {
         clonedOptions.setCredentialOptions(
             CredentialOptions.credential(
                 pipelineOptions.as(GcpOptions.class).getGcpCredential()));
@@ -398,24 +393,33 @@ public class BigtableIO {
    * @see BigtableIO
    */
   @Experimental
-  public static class Write
+  @AutoValue
+  public abstract static class Write
       extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>,
PDone> {
-    /**
-     * Used to define the Cloud Bigtable instance and any options for the networking layer.
-     * Cannot actually be {@code null} at validation time, but may start out {@code null}
while
-     * source is being built.
-     */
-    @Nullable private final BigtableOptions options;
-    private final String tableId;
-    @Nullable private final BigtableService bigtableService;
 
-    private Write(
-        @Nullable BigtableOptions options,
-        String tableId,
-        @Nullable BigtableService bigtableService) {
-      this.options = options;
-      this.tableId = checkNotNull(tableId, "tableId");
-      this.bigtableService = bigtableService;
+    /** Returns the table being written to. */
+    @Nullable
+    abstract String getTableId();
+
+    @Nullable
+    abstract BigtableService getBigtableService();
+
+    /** Returns the Google Cloud Bigtable instance being written to, and other parameters.
*/
+    @Nullable
+    public abstract BigtableOptions getBigtableOptions();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+
+      abstract Builder setTableId(String tableId);
+
+      abstract Builder setBigtableOptions(BigtableOptions options);
+
+      abstract Builder setBigtableService(BigtableService bigtableService);
+
+      abstract Write build();
     }
 
     /**
@@ -452,7 +456,7 @@ public class BigtableIO {
           .setUseCachedDataPool(true);
       BigtableOptions optionsWithAgent =
           clonedBuilder.setUserAgent(getBeamSdkPartOfUserAgent()).build();
-      return new Write(optionsWithAgent, tableId, bigtableService);
+      return toBuilder().setBigtableOptions(optionsWithAgent).build();
     }
 
     /**
@@ -462,26 +466,12 @@ public class BigtableIO {
      */
     public Write withTableId(String tableId) {
       checkNotNull(tableId, "tableId");
-      return new Write(options, tableId, bigtableService);
-    }
-
-    /**
-     * Returns the Google Cloud Bigtable instance being written to, and other parameters.
-     */
-    public BigtableOptions getBigtableOptions() {
-      return options;
-    }
-
-    /**
-     * Returns the table being written to.
-     */
-    public String getTableId() {
-      return tableId;
+      return toBuilder().setTableId(tableId).build();
     }
 
     @Override
     public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>>
input) {
-      input.apply(ParDo.of(new BigtableWriterFn(tableId,
+      input.apply(ParDo.of(new BigtableWriterFn(getTableId(),
           new SerializableFunction<PipelineOptions, BigtableService>() {
         @Override
         public BigtableService apply(PipelineOptions options) {
@@ -493,15 +483,15 @@ public class BigtableIO {
 
     @Override
     public void validate(PipelineOptions options) {
-      checkArgument(this.options != null, "BigtableOptions not specified");
-      checkArgument(!tableId.isEmpty(), "Table ID not specified");
+      checkArgument(getBigtableOptions() != null, "BigtableOptions not specified");
+      checkArgument(getTableId() != null && !getTableId().isEmpty(), "Table ID not
specified");
       try {
         checkArgument(
-            getBigtableService(options).tableExists(tableId),
+            getBigtableService(options).tableExists(getTableId()),
             "Table %s does not exist",
-            tableId);
+            getTableId());
       } catch (IOException e) {
-        LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e);
+        LOG.warn("Error checking whether table {} exists; proceeding.", getTableId(), e);
       }
     }
 
@@ -515,18 +505,18 @@ public class BigtableIO {
      */
     Write withBigtableService(BigtableService bigtableService) {
       checkNotNull(bigtableService, "bigtableService");
-      return new Write(options, tableId, bigtableService);
+      return toBuilder().setBigtableService(bigtableService).build();
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      builder.add(DisplayData.item("tableId", tableId)
+      builder.add(DisplayData.item("tableId", getTableId())
         .withLabel("Table ID"));
 
-      if (options != null) {
-        builder.add(DisplayData.item("bigtableOptions", options.toString())
+      if (getBigtableOptions() != null) {
+        builder.add(DisplayData.item("bigtableOptions", getBigtableOptions().toString())
           .withLabel("Bigtable Options"));
       }
     }
@@ -534,8 +524,8 @@ public class BigtableIO {
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(Write.class)
-          .add("options", options)
-          .add("tableId", tableId)
+          .add("options", getBigtableOptions())
+          .add("tableId", getTableId())
           .toString();
     }
 
@@ -549,11 +539,12 @@ public class BigtableIO {
      */
     @VisibleForTesting
     BigtableService getBigtableService(PipelineOptions pipelineOptions) {
-      if (bigtableService != null) {
-        return bigtableService;
+      if (getBigtableService() != null) {
+        return getBigtableService();
       }
-      BigtableOptions.Builder clonedOptions = options.toBuilder();
-      if (options.getCredentialOptions().getCredentialType() == CredentialType.DefaultCredentials)
{
+      BigtableOptions.Builder clonedOptions = getBigtableOptions().toBuilder();
+      if (getBigtableOptions().getCredentialOptions()
+          .getCredentialType() == CredentialType.DefaultCredentials) {
         clonedOptions.setCredentialOptions(
             CredentialOptions.credential(
                 pipelineOptions.as(GcpOptions.class).getGcpCredential()));


Mime
View raw message