beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Remove References to CloudObject from the Java Harness
Date Tue, 01 Aug 2017 21:22:41 GMT
Remove References to CloudObject from the Java Harness

Migrates to using the shared Runner API definitions.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/64cf18fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/64cf18fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/64cf18fc

Branch: refs/heads/master
Commit: 64cf18fcdb4237189a5212b6476bdadf73a2ac7f
Parents: 3c81766
Author: Thomas Groh <tgroh@google.com>
Authored: Wed Jul 26 15:34:23 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Tue Aug 1 14:22:21 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/BeamFnDataReadRunner.java   | 27 ++++++++----------
 .../beam/fn/harness/BeamFnDataWriteRunner.java  | 22 ++++++---------
 .../fn/harness/BeamFnDataReadRunnerTest.java    | 28 +++++++++----------
 .../fn/harness/BeamFnDataWriteRunnerTest.java   | 24 ++++++----------
 .../beam/fn/harness/FnApiDoFnRunnerTest.java    | 29 --------------------
 5 files changed, 41 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
index e2c17b0..1e611db 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
@@ -35,8 +34,8 @@ import org.apache.beam.fn.harness.data.BeamFnDataClient;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -91,8 +90,9 @@ public class BeamFnDataReadRunner<OutputT> {
           .setPrimitiveTransformReference(pTransformId)
           .setName(getOnlyElement(pTransform.getOutputsMap().keySet()))
           .build();
-      RunnerApi.Coder coderSpec = coders.get(pCollections.get(
-          getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
+      RunnerApi.Coder coderSpec =
+          coders.get(
+              pCollections.get(getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
       Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers =
           (Collection) pCollectionIdsToConsumers.get(
               getOnlyElement(pTransform.getOutputsMap().values()));
@@ -102,6 +102,7 @@ public class BeamFnDataReadRunner<OutputT> {
           processBundleInstructionId,
           target,
           coderSpec,
+          coders,
           beamFnDataClient,
           consumers);
       addStartFunction.accept(runner::registerInputLocation);
@@ -124,6 +125,7 @@ public class BeamFnDataReadRunner<OutputT> {
       Supplier<String> processBundleInstructionIdSupplier,
       BeamFnApi.Target inputTarget,
       RunnerApi.Coder coderSpec,
+      Map<String, RunnerApi.Coder> coders,
       BeamFnDataClient beamFnDataClientFactory,
       Collection<ThrowingConsumer<WindowedValue<OutputT>>> consumers)
           throws IOException {
@@ -137,17 +139,10 @@ public class BeamFnDataReadRunner<OutputT> {
     @SuppressWarnings("unchecked")
     Coder<WindowedValue<OutputT>> coder =
         (Coder<WindowedValue<OutputT>>)
-            CloudObjects.coderFromCloudObject(
-                CloudObject.fromSpec(
-                    OBJECT_MAPPER.readValue(
-                        coderSpec
-                            .getSpec()
-                            .getSpec()
-                            .getParameter()
-                            .unpack(BytesValue.class)
-                            .getValue()
-                            .newInput(),
-                        Map.class)));
+            CoderTranslation.fromProto(
+                coderSpec,
+                RehydratedComponents.forComponents(
+                    RunnerApi.Components.newBuilder().putAllCoders(coders).build()));
     this.coder = coder;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
index eec4dfd..bbed753 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataWriteRunner.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.Map;
 import java.util.function.Consumer;
@@ -34,8 +33,8 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObject;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.core.construction.CoderTranslation;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -93,6 +92,7 @@ public class BeamFnDataWriteRunner<InputT> {
               processBundleInstructionId,
               target,
               coderSpec,
+              coders,
               beamFnDataClient);
       addStartFunction.accept(runner::registerForOutput);
       pCollectionIdsToConsumers.put(
@@ -117,6 +117,7 @@ public class BeamFnDataWriteRunner<InputT> {
       Supplier<String> processBundleInstructionIdSupplier,
       BeamFnApi.Target outputTarget,
       RunnerApi.Coder coderSpec,
+      Map<String, RunnerApi.Coder> coders,
       BeamFnDataClient beamFnDataClientFactory)
           throws IOException {
     this.apiServiceDescriptor = functionSpec.getParameter().unpack(BeamFnApi.RemoteGrpcPort.class)
@@ -128,17 +129,10 @@ public class BeamFnDataWriteRunner<InputT> {
     @SuppressWarnings("unchecked")
     Coder<WindowedValue<InputT>> coder =
         (Coder<WindowedValue<InputT>>)
-            CloudObjects.coderFromCloudObject(
-                CloudObject.fromSpec(
-                    OBJECT_MAPPER.readValue(
-                        coderSpec
-                            .getSpec()
-                            .getSpec()
-                            .getParameter()
-                            .unpack(BytesValue.class)
-                            .getValue()
-                            .newInput(),
-                        Map.class)));
+            CoderTranslation.fromProto(
+                coderSpec,
+                RehydratedComponents.forComponents(
+                    RunnerApi.Components.newBuilder().putAllCoders(coders).build()));
     this.coder = coder;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
index a7c6666..d712f5f 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataReadRunnerTest.java
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
@@ -39,8 +38,6 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -56,10 +53,11 @@ import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.harness.test.TestExecutors;
 import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -79,7 +77,6 @@ import org.mockito.MockitoAnnotations;
 @RunWith(JUnit4.class)
 public class BeamFnDataReadRunnerTest {
 
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
   private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
@@ -88,19 +85,19 @@ public class BeamFnDataReadRunnerTest {
       WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
   private static final String CODER_SPEC_ID = "string-coder-id";
   private static final RunnerApi.Coder CODER_SPEC;
+  private static final RunnerApi.Components COMPONENTS;
   private static final String URN = "urn:org.apache.beam:source:runner:0.1";
 
   static {
     try {
-      CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec(
-          RunnerApi.SdkFunctionSpec.newBuilder().setSpec(
-              RunnerApi.FunctionSpec.newBuilder().setParameter(
-                  Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-                      OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER))))
-                      .build()))
-                  .build())
-              .build())
-          .build();
+      MessageWithComponents coderAndComponents = CoderTranslation.toProto(CODER);
+      CODER_SPEC = coderAndComponents.getCoder();
+      COMPONENTS =
+          coderAndComponents
+              .getComponents()
+              .toBuilder()
+              .putCoders(CODER_SPEC_ID, CODER_SPEC)
+              .build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }
@@ -150,7 +147,7 @@ public class BeamFnDataReadRunnerTest {
         Suppliers.ofInstance(bundleId)::get,
         ImmutableMap.of("outputPC",
             RunnerApi.PCollection.newBuilder().setCoderId(CODER_SPEC_ID).build()),
-        ImmutableMap.of(CODER_SPEC_ID, CODER_SPEC),
+        COMPONENTS.getCodersMap(),
         consumers,
         startFunctions::add,
         finishFunctions::add);
@@ -200,6 +197,7 @@ public class BeamFnDataReadRunnerTest {
         bundleId::get,
         INPUT_TARGET,
         CODER_SPEC,
+        COMPONENTS.getCodersMap(),
         mockBeamFnDataClient,
         ImmutableList.of(valuesA::add, valuesB::add));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
index 28838b1..0caf19e 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/BeamFnDataWriteRunnerTest.java
@@ -32,15 +32,12 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -53,10 +50,11 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
+import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -74,7 +72,6 @@ import org.mockito.MockitoAnnotations;
 @RunWith(JUnit4.class)
 public class BeamFnDataWriteRunnerTest {
 
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder()
       .setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
   private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder()
@@ -83,19 +80,15 @@ public class BeamFnDataWriteRunnerTest {
   private static final Coder<WindowedValue<String>> CODER =
       WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
   private static final RunnerApi.Coder CODER_SPEC;
+  private static final RunnerApi.Components COMPONENTS;
   private static final String URN = "urn:org.apache.beam:sink:runner:0.1";
 
   static {
     try {
-      CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec(
-          RunnerApi.SdkFunctionSpec.newBuilder().setSpec(
-              RunnerApi.FunctionSpec.newBuilder().setParameter(
-                  Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-                      OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER))))
-                      .build()))
-                  .build())
-              .build())
-          .build();
+      MessageWithComponents coderAndComponents = CoderTranslation.toProto(CODER);
+      CODER_SPEC = coderAndComponents.getCoder();
+      COMPONENTS =
+          coderAndComponents.getComponents().toBuilder().putCoders(CODER_ID, CODER_SPEC).build();
     } catch (IOException e) {
       throw new ExceptionInInitializerError(e);
     }
@@ -140,7 +133,7 @@ public class BeamFnDataWriteRunnerTest {
         Suppliers.ofInstance(bundleId)::get,
         ImmutableMap.of("inputPC",
             RunnerApi.PCollection.newBuilder().setCoderId(CODER_ID).build()),
-        ImmutableMap.of(CODER_ID, CODER_SPEC),
+        COMPONENTS.getCodersMap(),
         consumers,
         startFunctions::add,
         finishFunctions::add);
@@ -201,6 +194,7 @@ public class BeamFnDataWriteRunnerTest {
         bundleId::get,
         OUTPUT_TARGET,
         CODER_SPEC,
+        COMPONENTS.getCodersMap(),
         mockBeamFnDataClient);
 
     // Process for bundle id 0

http://git-wip-us.apache.org/repos/asf/beam/blob/64cf18fc/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 98362a2..e269bcc 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
@@ -35,19 +34,14 @@ import com.google.common.collect.Multimap;
 import com.google.protobuf.Any;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.BytesValue;
-import com.google.protobuf.Message;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.ServiceLoader;
 import org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar;
 import org.apache.beam.fn.harness.fn.ThrowingConsumer;
 import org.apache.beam.fn.harness.fn.ThrowingRunnable;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
-import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.DoFnInfo;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -66,28 +60,6 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link FnApiDoFnRunner}. */
 @RunWith(JUnit4.class)
 public class FnApiDoFnRunnerTest {
-
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-  private static final Coder<WindowedValue<String>> STRING_CODER =
-      WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
-  private static final String STRING_CODER_SPEC_ID = "999L";
-  private static final RunnerApi.Coder STRING_CODER_SPEC;
-
-  static {
-    try {
-      STRING_CODER_SPEC = RunnerApi.Coder.newBuilder()
-          .setSpec(RunnerApi.SdkFunctionSpec.newBuilder()
-              .setSpec(RunnerApi.FunctionSpec.newBuilder()
-                  .setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
-                      OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(STRING_CODER))))
-                      .build())))
-              .build())
-          .build();
-    } catch (IOException e) {
-      throw new ExceptionInInitializerError(e);
-    }
-  }
-
   private static class TestDoFn extends DoFn<String, String> {
     private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput");
     private static final TupleTag<String> additionalOutput = new TupleTag<>("output");
@@ -117,7 +89,6 @@ public class FnApiDoFnRunnerTest {
    */
   @Test
   public void testCreatingAndProcessingDoFn() throws Exception {
-    Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC);
     String pTransformId = "pTransformId";
     String mainOutputId = "101";
     String additionalOutputId = "102";


Mime
View raw message