beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [22/50] [abbrv] beam git commit: [BEAM-1347] Add DoFnRunner specific to Fn Api.
Date Thu, 13 Jul 2017 03:06:34 GMT
[BEAM-1347] Add DoFnRunner specific to Fn Api.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/78a39bd5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/78a39bd5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/78a39bd5

Branch: refs/heads/DSL_SQL
Commit: 78a39bd54136ad29a0c8b7fab2dfe895c502e4f5
Parents: 513ccdf
Author: Luke Cwik <lcwik@google.com>
Authored: Fri Jun 23 14:34:36 2017 -0700
Committer: Tyler Akidau <takidau@apache.org>
Committed: Wed Jul 12 20:01:00 2017 -0700

----------------------------------------------------------------------
 sdks/java/harness/pom.xml                       |  10 +
 .../beam/runners/core/FnApiDoFnRunner.java      | 483 ++++++++++++++++---
 .../beam/runners/core/FnApiDoFnRunnerTest.java  |   7 +-
 3 files changed, 438 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/78a39bd5/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index 9cfadc2..fe5c2f1 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -83,6 +83,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
     </dependency>
 
@@ -150,6 +155,11 @@
     </dependency>
 
     <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/78a39bd5/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java
index adf735a..b3cf3a7 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/FnApiDoFnRunner.java
@@ -27,49 +27,59 @@ import com.google.common.collect.Multimap;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
 import com.google.protobuf.InvalidProtocolBufferException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import org.apache.beam.fn.harness.data.BeamFnDataClient;
-import org.apache.beam.fn.harness.fake.FakeStepContext;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
-import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OnTimerContext;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Instant;
 
 /**
- * Classes associated with converting {@link RunnerApi.PTransform}s to {@link DoFnRunner}s.
- *
- * <p>TODO: Move DoFnRunners into SDK harness and merge the methods below into it removing
this
- * class.
+ * A {@link DoFnRunner} specific to integrating with the Fn Api. This is to remove the layers
+ * of abstraction caused by StateInternals/TimerInternals since they model state and timer
+ * concepts differently.
  */
-public class FnApiDoFnRunner {
-
-  private static final String URN = "urn:org.apache.beam:dofn:java:0.1";
-
-  /** A registrar which provides a factory to handle Java {@link DoFn}s. */
+public class FnApiDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT>
{
+  /**
+   * A registrar which provides a factory to handle Java {@link DoFn}s.
+   */
   @AutoService(PTransformRunnerFactory.Registrar.class)
   public static class Registrar implements
       PTransformRunnerFactory.Registrar {
 
     @Override
     public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
-      return ImmutableMap.of(URN, new Factory());
+      return ImmutableMap.of(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN, new Factory());
     }
   }
 
-  /** A factory for {@link DoFnRunner}s. */
+  /** A factory for {@link FnApiDoFnRunner}. */
   static class Factory<InputT, OutputT>
       implements PTransformRunnerFactory<DoFnRunner<InputT, OutputT>> {
 
@@ -105,9 +115,9 @@ public class FnApiDoFnRunner {
         throw new IllegalArgumentException(
             String.format("Unable to unwrap DoFn %s", pTransform.getSpec()), e);
       }
-      DoFnInfo<?, ?> doFnInfo =
-          (DoFnInfo<?, ?>)
-              SerializableUtils.deserializeFromByteArray(serializedFn.toByteArray(), "DoFnInfo");
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      DoFnInfo<InputT, OutputT> doFnInfo = (DoFnInfo) SerializableUtils.deserializeFromByteArray(
+          serializedFn.toByteArray(), "DoFnInfo");
 
       // Verify that the DoFnInfo tag to output map matches the output map on the PTransform.
       checkArgument(
@@ -119,54 +129,26 @@ public class FnApiDoFnRunner {
           doFnInfo.getOutputMap());
 
       ImmutableMultimap.Builder<TupleTag<?>,
-          ThrowingConsumer<WindowedValue<OutputT>>> tagToOutput =
+          ThrowingConsumer<WindowedValue<?>>> tagToOutputMapBuilder =
           ImmutableMultimap.builder();
       for (Map.Entry<Long, TupleTag<?>> entry : doFnInfo.getOutputMap().entrySet())
{
         @SuppressWarnings({"unchecked", "rawtypes"})
-        Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers
=
-            (Collection) outputMap.get(Long.toString(entry.getKey()));
-        tagToOutput.putAll(entry.getValue(), consumers);
+        Collection<ThrowingConsumer<WindowedValue<?>>> consumers =
+            outputMap.get(Long.toString(entry.getKey()));
+        tagToOutputMapBuilder.putAll(entry.getValue(), consumers);
       }
 
+      ImmutableMultimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>>
tagToOutputMap =
+          tagToOutputMapBuilder.build();
+
       @SuppressWarnings({"unchecked", "rawtypes"})
-      Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>>
tagBasedOutputMap =
-          (Map) tagToOutput.build().asMap();
-
-      OutputManager outputManager =
-          new OutputManager() {
-            Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>>
tupleTagToOutput =
-                tagBasedOutputMap;
-
-            @Override
-            public <T> void output(TupleTag<T> tag, WindowedValue<T> output)
{
-              try {
-                Collection<ThrowingConsumer<WindowedValue<?>>> consumers
=
-                    tupleTagToOutput.get(tag);
-                if (consumers == null) {
-                    /* This is a normal case, e.g., if a DoFn has output but that output
is not
-                     * consumed. Drop the output. */
-                  return;
-                }
-                for (ThrowingConsumer<WindowedValue<?>> consumer : consumers)
{
-                  consumer.accept(output);
-                }
-              } catch (Throwable t) {
-                throw new RuntimeException(t);
-              }
-            }
-          };
-
-      @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
-      DoFnRunner<InputT, OutputT> runner =
-          DoFnRunners.simpleRunner(
-              pipelineOptions,
-              (DoFn) doFnInfo.getDoFn(),
-              NullSideInputReader.empty(), /* TODO */
-              outputManager,
-              (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()),
-              new ArrayList<>(doFnInfo.getOutputMap().values()),
-              new FakeStepContext(),
-              (WindowingStrategy) doFnInfo.getWindowingStrategy());
+      DoFnRunner<InputT, OutputT> runner = new FnApiDoFnRunner<>(
+          pipelineOptions,
+          doFnInfo.getDoFn(),
+          (Collection<ThrowingConsumer<WindowedValue<OutputT>>>) (Collection)
+              tagToOutputMap.get(doFnInfo.getOutputMap().get(doFnInfo.getMainOutput())),
+          tagToOutputMap,
+          doFnInfo.getWindowingStrategy());
 
       // Register the appropriate handlers.
       addStartFunction.accept(runner::startBundle);
@@ -179,4 +161,387 @@ public class FnApiDoFnRunner {
       return runner;
     }
   }
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private final PipelineOptions pipelineOptions;
+  private final DoFn<InputT, OutputT> doFn;
+  private final Collection<ThrowingConsumer<WindowedValue<OutputT>>> mainOutputConsumers;
+  private final Multimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>>
outputMap;
+  private final DoFnInvoker<InputT, OutputT> doFnInvoker;
+  private final StartBundleContext startBundleContext;
+  private final ProcessBundleContext processBundleContext;
+  private final FinishBundleContext finishBundleContext;
+
+  /**
+   * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}.
+   */
+  private WindowedValue<InputT> currentElement;
+
+  /**
+   * The lifetime of this member is only valid during {@link #processElement(WindowedValue)}.
+   */
+  private BoundedWindow currentWindow;
+
+  FnApiDoFnRunner(
+      PipelineOptions pipelineOptions,
+      DoFn<InputT, OutputT> doFn,
+      Collection<ThrowingConsumer<WindowedValue<OutputT>>> mainOutputConsumers,
+      Multimap<TupleTag<?>, ThrowingConsumer<WindowedValue<?>>> outputMap,
+      WindowingStrategy windowingStrategy) {
+    this.pipelineOptions = pipelineOptions;
+    this.doFn = doFn;
+    this.mainOutputConsumers = mainOutputConsumers;
+    this.outputMap = outputMap;
+    this.doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    this.startBundleContext = new StartBundleContext();
+    this.processBundleContext = new ProcessBundleContext();
+    this.finishBundleContext = new FinishBundleContext();
+  }
+
+  @Override
+  public void startBundle() {
+    doFnInvoker.invokeStartBundle(startBundleContext);
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> elem) {
+    currentElement = elem;
+    try {
+      Iterator<BoundedWindow> windowIterator =
+          (Iterator<BoundedWindow>) elem.getWindows().iterator();
+      while (windowIterator.hasNext()) {
+        currentWindow = windowIterator.next();
+        doFnInvoker.invokeProcessElement(processBundleContext);
+      }
+    } finally {
+      currentElement = null;
+      currentWindow = null;
+    }
+  }
+
+  @Override
+  public void onTimer(
+      String timerId,
+      BoundedWindow window,
+      Instant timestamp,
+      TimeDomain timeDomain) {
+    throw new UnsupportedOperationException("TODO: Add support for timers");
+  }
+
+  @Override
+  public void finishBundle() {
+    doFnInvoker.invokeFinishBundle(finishBundleContext);
+  }
+
+  /**
+   * Outputs the given element to the specified set of consumers wrapping any exceptions.
+   */
+  private <T> void outputTo(
+      Collection<ThrowingConsumer<WindowedValue<T>>> consumers,
+      WindowedValue<T> output) {
+    Iterator<ThrowingConsumer<WindowedValue<T>>> consumerIterator;
+    try {
+      for (ThrowingConsumer<WindowedValue<T>> consumer : consumers) {
+        consumer.accept(output);
+      }
+    } catch (Throwable t) {
+      throw UserCodeException.wrap(t);
+    }
+  }
+
+  /**
+   * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.StartBundle @StartBundle}.
+   */
+  private class StartBundleContext
+      extends DoFn<InputT, OutputT>.StartBundleContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private StartBundleContext() {
+      doFn.super();
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public PipelineOptions pipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      throw new UnsupportedOperationException(
+          "Cannot access window outside of @ProcessElement and @OnTimer methods.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
+        DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+        DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access FinishBundleContext outside of @FinishBundle method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT>
doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access ProcessContext outside of @ProcessElement method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access OnTimerContext outside of @OnTimer methods.");
+    }
+
+    @Override
+    public RestrictionTracker<?> restrictionTracker() {
+      throw new UnsupportedOperationException(
+          "Cannot access RestrictionTracker outside of @ProcessElement method.");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException(
+          "Cannot access state outside of @ProcessElement and @OnTimer methods.");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException(
+          "Cannot access timers outside of @ProcessElement and @OnTimer methods.");
+    }
+  }
+
+  /**
+   * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.ProcessElement @ProcessElement}.
+   */
+  private class ProcessBundleContext
+      extends DoFn<InputT, OutputT>.ProcessContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private ProcessBundleContext() {
+      doFn.super();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return currentWindow;
+    }
+
+    @Override
+    public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access StartBundleContext outside of @StartBundle method.");
+    }
+
+    @Override
+    public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn)
{
+      throw new UnsupportedOperationException(
+          "Cannot access FinishBundleContext outside of @FinishBundle method.");
+    }
+
+    @Override
+    public ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException("TODO: Add support for timers");
+    }
+
+    @Override
+    public RestrictionTracker<?> restrictionTracker() {
+      throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException("TODO: Add support for state");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException("TODO: Add support for timers");
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public PipelineOptions pipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public void output(OutputT output) {
+      outputTo(mainOutputConsumers,
+          WindowedValue.of(
+              output,
+              currentElement.getTimestamp(),
+              currentWindow,
+              currentElement.getPane()));
+    }
+
+    @Override
+    public void outputWithTimestamp(OutputT output, Instant timestamp) {
+      outputTo(mainOutputConsumers,
+          WindowedValue.of(
+              output,
+              timestamp,
+              currentWindow,
+              currentElement.getPane()));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output) {
+      Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection)
outputMap.get(tag);
+      if (consumers == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      outputTo(consumers,
+          WindowedValue.of(
+              output,
+              currentElement.getTimestamp(),
+              currentWindow,
+              currentElement.getPane()));
+    }
+
+    @Override
+    public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp)
{
+      Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection)
outputMap.get(tag);
+      if (consumers == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      outputTo(consumers,
+          WindowedValue.of(
+              output,
+              timestamp,
+              currentWindow,
+              currentElement.getPane()));
+    }
+
+    @Override
+    public InputT element() {
+      return currentElement.getValue();
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      throw new UnsupportedOperationException("TODO: Support side inputs");
+    }
+
+    @Override
+    public Instant timestamp() {
+      return currentElement.getTimestamp();
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return currentElement.getPane();
+    }
+
+    @Override
+    public void updateWatermark(Instant watermark) {
+      throw new UnsupportedOperationException("TODO: Add support for SplittableDoFn");
+    }
+  }
+
+  /**
+   * Provides arguments for a {@link DoFnInvoker} for {@link DoFn.FinishBundle @FinishBundle}.
+   */
+  private class FinishBundleContext
+      extends DoFn<InputT, OutputT>.FinishBundleContext
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
+
+    private FinishBundleContext() {
+      doFn.super();
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public PipelineOptions pipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
+    public BoundedWindow window() {
+      throw new UnsupportedOperationException(
+          "Cannot access window outside of @ProcessElement and @OnTimer methods.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(
+        DoFn<InputT, OutputT> doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access StartBundleContext outside of @StartBundle method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
+        DoFn<InputT, OutputT> doFn) {
+      return this;
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT>
doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access ProcessContext outside of @ProcessElement method.");
+    }
+
+    @Override
+    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT>
doFn) {
+      throw new UnsupportedOperationException(
+          "Cannot access OnTimerContext outside of @OnTimer methods.");
+    }
+
+    @Override
+    public RestrictionTracker<?> restrictionTracker() {
+      throw new UnsupportedOperationException(
+          "Cannot access RestrictionTracker outside of @ProcessElement method.");
+    }
+
+    @Override
+    public State state(String stateId) {
+      throw new UnsupportedOperationException(
+          "Cannot access state outside of @ProcessElement and @OnTimer methods.");
+    }
+
+    @Override
+    public Timer timer(String timerId) {
+      throw new UnsupportedOperationException(
+          "Cannot access timers outside of @ProcessElement and @OnTimer methods.");
+    }
+
+    @Override
+    public void output(OutputT output, Instant timestamp, BoundedWindow window) {
+      outputTo(mainOutputConsumers,
+          WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
+    }
+
+    @Override
+    public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow
window) {
+      Collection<ThrowingConsumer<WindowedValue<T>>> consumers = (Collection)
outputMap.get(tag);
+      if (consumers == null) {
+        throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
+      }
+      outputTo(consumers,
+          WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/78a39bd5/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
index ae5cbac..c4df77a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/FnApiDoFnRunnerTest.java
@@ -44,6 +44,7 @@ import java.util.ServiceLoader;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
+import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
 import org.apache.beam.sdk.coders.Coder;
@@ -71,7 +72,6 @@ public class FnApiDoFnRunnerTest {
       WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
   private static final String STRING_CODER_SPEC_ID = "999L";
   private static final RunnerApi.Coder STRING_CODER_SPEC;
-  private static final String URN = "urn:org.apache.beam:dofn:java:0.1";
 
   static {
     try {
@@ -132,7 +132,7 @@ public class FnApiDoFnRunnerTest {
             Long.parseLong(mainOutputId), TestDoFn.mainOutput,
             Long.parseLong(additionalOutputId), TestDoFn.additionalOutput));
     RunnerApi.FunctionSpec functionSpec = RunnerApi.FunctionSpec.newBuilder()
-        .setUrn("urn:org.apache.beam:dofn:java:0.1")
+        .setUrn(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN)
         .setParameter(Any.pack(BytesValue.newBuilder()
             .setValue(ByteString.copyFrom(SerializableUtils.serializeToByteArray(doFnInfo)))
             .build()))
@@ -200,7 +200,8 @@ public class FnApiDoFnRunnerTest {
     for (Registrar registrar :
         ServiceLoader.load(Registrar.class)) {
       if (registrar instanceof FnApiDoFnRunner.Registrar) {
-        assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN));
+        assertThat(registrar.getPTransformRunnerFactories(),
+            IsMapContaining.hasKey(ParDoTranslation.CUSTOM_JAVA_DO_FN_URN));
         return;
       }
     }


Mime
View raw message