beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: Remove Aggregators from BigQuery and PubSub
Date Tue, 21 Mar 2017 00:15:50 GMT
Repository: beam
Updated Branches:
  refs/heads/master 8d240981b -> 4ffd43ed7


Remove Aggregators from BigQuery and PubSub


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/695936ff
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/695936ff
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/695936ff

Branch: refs/heads/master
Commit: 695936ffaac03799d4ee972fb99b73202582e7fa
Parents: 8d24098
Author: Pablo <pabloem@google.com>
Authored: Mon Mar 20 15:39:53 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Mon Mar 20 17:15:35 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/PubsubUnboundedSink.java | 24 ++++++++------------
 .../beam/sdk/io/PubsubUnboundedSource.java      |  8 +++----
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 13 ++++-------
 3 files changed, 19 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/695936ff/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
index c726fd7..f41b5b7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java
@@ -45,15 +45,15 @@ import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.PubsubIO.PubsubMessage;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.windowing.AfterFirst;
@@ -164,8 +164,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
    * Convert elements to messages and shard them.
    */
   private static class ShardFn<T> extends DoFn<T, KV<Integer, OutgoingMessage>>
{
-    private final Aggregator<Long, Long> elementCounter =
-        createAggregator("elements", Sum.ofLongs());
+    private final Counter elementCounter = Metrics.counter(ShardFn.class, "elements");
     private final Coder<T> elementCoder;
     private final int numShards;
     private final RecordIdMethod recordIdMethod;
@@ -181,7 +180,7 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
-      elementCounter.addValue(1L);
+      elementCounter.inc();
       byte[] elementBytes = null;
       Map<String, String> attributes = ImmutableMap.<String, String>of();
       if (formatFn != null) {
@@ -242,12 +241,9 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
     @Nullable
     private transient PubsubClient pubsubClient;
 
-    private final Aggregator<Long, Long> batchCounter =
-        createAggregator("batches", Sum.ofLongs());
-    private final Aggregator<Long, Long> elementCounter =
-        createAggregator("elements", Sum.ofLongs());
-    private final Aggregator<Long, Long> byteCounter =
-        createAggregator("bytes", Sum.ofLongs());
+    private final Counter batchCounter = Metrics.counter(WriterFn.class, "batches");
+    private final Counter elementCounter = Metrics.counter(WriterFn.class, "elements");
+    private final Counter byteCounter = Metrics.counter(WriterFn.class, "bytes");
 
     WriterFn(
         PubsubClientFactory pubsubFactory, ValueProvider<TopicPath> topic,
@@ -269,9 +265,9 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>,
PDone> {
       int n = pubsubClient.publish(topic.get(), messages);
       checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful",
                  messages.size(), n);
-      batchCounter.addValue(1L);
-      elementCounter.addValue((long) messages.size());
-      byteCounter.addValue((long) bytes);
+      batchCounter.inc();
+      elementCounter.inc(messages.size());
+      byteCounter.inc(bytes);
     }
 
     @StartBundle

http://git-wip-us.apache.org/repos/asf/beam/blob/695936ff/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
index 6c8a788..90bcc76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java
@@ -51,10 +51,11 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.PubsubIO.PubsubMessage;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PubsubOptions;
 import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -1169,8 +1170,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
   // ================================================================================
 
   private static class StatsFn<T> extends DoFn<T, T> {
-    private final Aggregator<Long, Long> elementCounter =
-        createAggregator("elements", Sum.ofLongs());
+    private final Counter elementCounter = Metrics.counter(StatsFn.class, "elements");
 
     private final PubsubClientFactory pubsubFactory;
     @Nullable
@@ -1198,7 +1198,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin,
PCollection<T>>
 
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
-      elementCounter.addValue(1L);
+      elementCounter.inc();
       c.output(c.element());
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/695936ff/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index e9ea0e0..03e18e6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -86,6 +86,8 @@ import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -93,7 +95,6 @@ import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -102,7 +103,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -118,7 +118,6 @@ import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
 import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.Transport;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
@@ -2436,7 +2435,6 @@ public class BigQueryIO {
   /**
    * Implementation of DoFn to perform streaming BigQuery write.
    */
-  @SystemDoFnInternal
   @VisibleForTesting
   static class StreamingWriteFn
       extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
@@ -2460,9 +2458,8 @@ public class BigQueryIO {
     private static Set<String> createdTables =
         Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
 
-    /** Tracks bytes written, exposed as "ByteCount" Counter. */
-    private Aggregator<Long, Long> byteCountAggregator =
-        createAggregator("ByteCount", Sum.ofLongs());
+    /** Tracks bytes written, exposed as "ByteCount" Metrics Counter. */
+    private Counter byteCounter = Metrics.counter(StreamingWriteFn.class, "ByteCount");
 
     /** Constructor. */
     StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema,
@@ -2564,7 +2561,7 @@ public class BigQueryIO {
         try {
           long totalBytes = bqServices.getDatasetService(options).insertAll(
               tableReference, tableRows, uniqueIds);
-          byteCountAggregator.addValue(totalBytes);
+          byteCounter.inc(totalBytes);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }


Mime
View raw message