beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/2] incubator-beam git commit: Use a int Sequence instead of a Random UUID for Aggregator IDs
Date Thu, 27 Oct 2016 23:28:40 GMT
Use a int Sequence instead of a Random UUID for Aggregator IDs

Aggregator IDs are used to ensure that an Aggregator's identity is
consistent across synchronization barriers. This is only relevant when
constructing the map of Step -> Aggregator to enable querying, as the
DoFns represented within the graph may be serialized. The identity has
no impact on the interaction between the runner and aggregator, which is
the responsibility of the ProcessContext object and
setupDelegateAggregators.

UUID#randomUUID uses a shared SecureRandom to create the bytes of the
UUID; SecureRandom#nextBytes is a synchronized method, so regardless of
the underlying source of randomness, only one random UUID can be
generated at a time. Instead, use an atomically increasing int to
identify aggregators. This should be sufficient for user-created
aggregators, and system aggregators should not care about the id.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd854b1a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd854b1a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd854b1a

Branch: refs/heads/master
Commit: dd854b1a71770b9b452361e0d92e018b65f1b3e8
Parents: f2ec824
Author: Thomas Groh <tgroh@google.com>
Authored: Thu Oct 27 10:19:03 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Thu Oct 27 16:28:32 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/DelegatingAggregator.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd854b1a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
index d92bb71..e03d3b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.base.MoreObjects;
 import java.io.Serializable;
 import java.util.Objects;
-import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 
 /**
@@ -37,7 +37,8 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
  */
 class DelegatingAggregator<AggInputT, AggOutputT>
     implements Aggregator<AggInputT, AggOutputT>, Serializable {
-  private final UUID id;
+  private static final AtomicInteger ID_GEN = new AtomicInteger();
+  private final int id;
 
   private final String name;
 
@@ -47,7 +48,7 @@ class DelegatingAggregator<AggInputT, AggOutputT>
 
   public DelegatingAggregator(String name,
       CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
-    this.id = UUID.randomUUID();
+    this.id = ID_GEN.getAndIncrement();
     this.name = checkNotNull(name, "name cannot be null");
     // Safe contravariant cast
     @SuppressWarnings("unchecked")


Mime
View raw message