beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [23/50] [abbrv] beam git commit: DirectRunner override matchers using Runner API
Date Mon, 12 Jun 2017 16:55:38 GMT
DirectRunner override matchers using Runner API


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8d90878
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8d90878
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8d90878

Branch: refs/heads/gearpump-runner
Commit: d8d9087877c01f1786271726a541fb3eeda7f939
Parents: ca7b9c2
Author: Kenneth Knowles <klk@google.com>
Authored: Thu May 25 06:31:16 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Jun 8 11:36:28 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d8d90878/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index dbd1ec4..136ccf3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
+import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -42,12 +43,9 @@ import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -230,33 +228,33 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
                 new WriteWithShardingFactory())) /* Uses a view internally. */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+                PTransformMatchers.urnEqualTo(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN),
                 new ViewOverrideFactory())) /* Uses pardos and GBKs */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(TestStream.class),
+                PTransformMatchers.urnEqualTo(PTransformTranslation.TEST_STREAM_TRANSFORM_URN),
                 new DirectTestStreamFactory(this))) /* primitive */
         // SplittableParMultiDo is implemented in terms of nonsplittable simple ParDos and
extra
         // primitives
         .add(
             PTransformOverride.of(
-                PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()))
+                PTransformMatchers.splittableParDo(), new ParDoMultiOverrideFactory()))
         // state and timer pardos are implemented in terms of simple ParDos and extra primitives
         .add(
             PTransformOverride.of(
-                PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()))
+                PTransformMatchers.stateOrTimerParDo(), new ParDoMultiOverrideFactory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
+                PTransformMatchers.urnEqualTo(
+                    SplittableParDo.SPLITTABLE_PROCESS_KEYED_ELEMENTS_URN),
                 new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(
-                    SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.class),
+                PTransformMatchers.urnEqualTo(SplittableParDo.SPLITTABLE_GBKIKWI_URN),
                 new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(GroupByKey.class),
+                PTransformMatchers.urnEqualTo(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN),
                 new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives.
*/
         .build();
   }


Mime
View raw message