beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [44/50] [abbrv] beam git commit: Use URNs, not Java classes, in immutability enforcements
Date Thu, 13 Jul 2017 03:06:56 GMT
Use URNs, not Java classes, in immutability enforcements


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a61f154
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a61f154
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a61f154

Branch: refs/heads/DSL_SQL
Commit: 6a61f154bdbc02c4d84053e13d68158e065e8372
Parents: 8ae2a79
Author: Kenneth Knowles <klk@google.com>
Authored: Mon Jul 10 15:25:11 2017 -0700
Committer: Tyler Akidau <takidau@apache.org>
Committed: Wed Jul 12 20:01:02 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 21 ++++++++------------
 .../direct/ExecutorServiceParallelExecutor.java | 16 ++++++---------
 2 files changed, 14 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6a61f154/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 7a221c4..4621224 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -38,14 +38,11 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineRunner;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -72,16 +69,17 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
     IMMUTABILITY {
       @Override
       public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
-        return CONTAINS_UDF.contains(graph.getProducer(collection).getTransform().getClass());
+        return CONTAINS_UDF.contains(
+            PTransformTranslation.urnForTransform(graph.getProducer(collection).getTransform()));
       }
     };
 
     /**
      * The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements.
      */
-    private static final Set<Class<? extends PTransform>> CONTAINS_UDF =
+    private static final Set<String> CONTAINS_UDF =
         ImmutableSet.of(
-            Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class);
+            PTransformTranslation.READ_TRANSFORM_URN, PTransformTranslation.PAR_DO_TRANSFORM_URN);
 
     public abstract boolean appliesTo(PCollection<?> collection, DirectGraph graph);
 
@@ -110,22 +108,19 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
       return bundleFactory;
     }
 
-    @SuppressWarnings("rawtypes")
-    private static Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+    private static Map<String, Collection<ModelEnforcementFactory>>
         defaultModelEnforcements(Set<Enforcement> enabledEnforcements) {
-      ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
-          enforcements = ImmutableMap.builder();
+      ImmutableMap.Builder<String, Collection<ModelEnforcementFactory>> enforcements
=
+          ImmutableMap.builder();
       ImmutableList.Builder<ModelEnforcementFactory> enabledParDoEnforcements =
           ImmutableList.builder();
       if (enabledEnforcements.contains(Enforcement.IMMUTABILITY)) {
         enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create());
       }
       Collection<ModelEnforcementFactory> parDoEnforcements = enabledParDoEnforcements.build();
-      enforcements.put(ParDo.SingleOutput.class, parDoEnforcements);
-      enforcements.put(MultiOutput.class, parDoEnforcements);
+      enforcements.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, parDoEnforcements);
       return enforcements.build();
     }
-
   }
 
   ////////////////////////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/beam/blob/6a61f154/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 2f4d1f6..75e2562 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -49,11 +49,11 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -77,9 +77,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
   private final DirectGraph graph;
   private final RootProviderRegistry rootProviderRegistry;
   private final TransformEvaluatorRegistry registry;
-  @SuppressWarnings("rawtypes")
-  private final Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
-      transformEnforcements;
+  private final Map<String, Collection<ModelEnforcementFactory>> transformEnforcements;
 
   private final EvaluationContext evaluationContext;
 
@@ -112,9 +110,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
       DirectGraph graph,
       RootProviderRegistry rootProviderRegistry,
       TransformEvaluatorRegistry registry,
-      @SuppressWarnings("rawtypes")
-          Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
-              transformEnforcements,
+      Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
       EvaluationContext context) {
     return new ExecutorServiceParallelExecutor(
         targetParallelism,
@@ -130,8 +126,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
       DirectGraph graph,
       RootProviderRegistry rootProviderRegistry,
       TransformEvaluatorRegistry registry,
-      @SuppressWarnings("rawtypes")
-      Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
transformEnforcements,
+      Map<String, Collection<ModelEnforcementFactory>> transformEnforcements,
       EvaluationContext context) {
     this.targetParallelism = targetParallelism;
     // Don't use Daemon threads for workers. The Pipeline should continue to execute even
if there
@@ -237,7 +232,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
 
     Collection<ModelEnforcementFactory> enforcements =
         MoreObjects.firstNonNull(
-            transformEnforcements.get(transform.getTransform().getClass()),
+            transformEnforcements.get(
+                PTransformTranslation.urnForTransform(transform.getTransform())),
             Collections.<ModelEnforcementFactory>emptyList());
 
     TransformExecutor<T> callable =


Mime
View raw message