beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Add PipelineOptionsTranslation
Date Wed, 20 Sep 2017 18:37:27 GMT
Add PipelineOptionsTranslation

This converts a PipelineOptions instance to and from a Protobuf Struct.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7ebb620
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7ebb620
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7ebb620

Branch: refs/heads/master
Commit: f7ebb6201b5f6e0bb3c585733b6c934eef62c68b
Parents: a1835c6
Author: Thomas Groh <tgroh@google.com>
Authored: Tue Aug 15 13:22:01 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Wed Sep 20 10:33:30 2017 -0700

----------------------------------------------------------------------
 .../PipelineOptionsTranslation.java             |  51 +++++++
 .../PipelineOptionsTranslationTest.java         | 143 +++++++++++++++++++
 2 files changed, 194 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f7ebb620/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
new file mode 100644
index 0000000..4cdca61
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Struct;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+/** Utilities for going to/from Runner API pipeline options. */
+public class PipelineOptionsTranslation {
+  private static final ObjectMapper MAPPER =
+      new ObjectMapper()
+          .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
+  /** Converts the provided {@link PipelineOptions} to a {@link Struct}. */
+  public static Struct toProto(PipelineOptions options) {
+    Struct.Builder builder = Struct.newBuilder();
+    try {
+      // The JSON format of a Protobuf Struct is the JSON object that is equivalent to that
struct
+      // (with values encoded in a standard json-codeable manner). See Beam PR 3719 for more.
+      JsonFormat.parser().merge(MAPPER.writeValueAsString(options), builder);
+      return builder.build();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Converts the provided {@link Struct} into {@link PipelineOptions}. */
+  public static PipelineOptions fromProto(Struct protoOptions) throws IOException {
+    return MAPPER.readValue(JsonFormat.printer().print(protoOptions), PipelineOptions.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/f7ebb620/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
new file mode 100644
index 0000000..eb59bac
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.Struct;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link PipelineOptionsTranslation}. */
+@RunWith(Enclosed.class)
+public class PipelineOptionsTranslationTest {
+  /** Tests that translations can round-trip through the proto format. */
+  @RunWith(Parameterized.class)
+  public static class ToFromProtoTest {
+    @Parameters(name = "{index}: {0}")
+    public static Iterable<? extends PipelineOptions> options() {
+      PipelineOptionsFactory.register(TestUnserializableOptions.class);
+      PipelineOptionsFactory.register(TestDefaultOptions.class);
+      PipelineOptionsFactory.register(TestOptions.class);
+      PipelineOptions emptyOptions = PipelineOptionsFactory.create();
+
+      TestUnserializableOptions withNonSerializable =
+          PipelineOptionsFactory.as(TestUnserializableOptions.class);
+      withNonSerializable.setUnserializable(new Object());
+
+      TestOptions withCustomField = PipelineOptionsFactory.as(TestOptions.class);
+      withCustomField.setExample(99);
+
+      PipelineOptions withSettings = PipelineOptionsFactory.create();
+      withSettings.as(ApplicationNameOptions.class).setAppName("my_app");
+      withSettings.setJobName("my_job");
+
+      PipelineOptions withParsedSettings =
+          PipelineOptionsFactory.fromArgs("--jobName=my_job --appName=my_app").create();
+
+      return ImmutableList.of(
+          emptyOptions, withNonSerializable, withCustomField, withSettings, withParsedSettings);
+    }
+
+    @Parameter(0)
+    public PipelineOptions options;
+
+    @Test
+    public void testToFromProto() throws Exception {
+      options.getOptionsId();
+      Struct originalStruct = PipelineOptionsTranslation.toProto(options);
+      PipelineOptions deserializedStruct = PipelineOptionsTranslation.fromProto(originalStruct);
+
+      Struct reserializedStruct = PipelineOptionsTranslation.toProto(deserializedStruct);
+      assertThat(reserializedStruct.getFieldsMap(), equalTo(originalStruct.getFieldsMap()));
+    }
+  }
+
+  /** Tests that translations contain the correct contents. */
+  @RunWith(JUnit4.class)
+  public static class TranslationTest {
+    @Test
+    public void customSettingsRetained() throws Exception {
+      TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
+      options.setExample(23);
+      Struct serialized = PipelineOptionsTranslation.toProto(options);
+      PipelineOptions deserialized = PipelineOptionsTranslation.fromProto(serialized);
+
+      assertThat(deserialized.as(TestOptions.class).getExample(), equalTo(23));
+    }
+
+    @Test
+    public void ignoredSettingsNotSerialized() throws Exception {
+      TestUnserializableOptions opts = PipelineOptionsFactory.as(TestUnserializableOptions.class);
+      opts.setUnserializable(new Object());
+
+      Struct serialized = PipelineOptionsTranslation.toProto(opts);
+      PipelineOptions deserialized = PipelineOptionsTranslation.fromProto(serialized);
+
+      assertThat(
+          deserialized.as(TestUnserializableOptions.class).getUnserializable(), is(nullValue()));
+    }
+
+    @Test
+    public void defaultsRestored() throws Exception {
+      Struct serialized =
+          PipelineOptionsTranslation.toProto(PipelineOptionsFactory.as(TestDefaultOptions.class));
+      PipelineOptions deserialized = PipelineOptionsTranslation.fromProto(serialized);
+
+      assertThat(deserialized.as(TestDefaultOptions.class).getDefault(), equalTo(19));
+    }
+  }
+
+  /** {@link PipelineOptions} with an unserializable option. */
+  public interface TestUnserializableOptions extends PipelineOptions {
+    @JsonIgnore
+    Object getUnserializable();
+
+    void setUnserializable(Object unserializable);
+  }
+
+  /** {@link PipelineOptions} with an default option. */
+  public interface TestDefaultOptions extends PipelineOptions {
+    @Default.Integer(19)
+    int getDefault();
+
+    void setDefault(int example);
+  }
+
+  /** {@link PipelineOptions} for testing. */
+  public interface TestOptions extends PipelineOptions {
+    int getExample();
+
+    void setExample(int example);
+  }
+}


Mime
View raw message