beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/4] beam git commit: Introduces SerializablePipelineOptions in core-construction
Date Wed, 02 Aug 2017 18:17:10 GMT
Introduces SerializablePipelineOptions in core-construction

Removes analogous classes from spark/flink and their tests.

The analogous class in Spark was SparkRuntimeContext, which
also contained a CoderRegistry, but the CoderRegistry was used
only in a class that was itself unused. I removed that class.

This also allows removing a bunch of Jackson dependencies
from Spark runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7db051ae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7db051ae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7db051ae

Branch: refs/heads/master
Commit: 7db051aeae2b8e6b2dbfcc1da31410ec118299f6
Parents: ff4b36c
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Fri Jul 28 12:48:41 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Wed Aug 2 11:04:50 2017 -0700

----------------------------------------------------------------------
 runners/apex/pom.xml                            |   8 -
 .../operators/ApexGroupByKeyOperator.java       |   6 +-
 .../operators/ApexParDoOperator.java            |   6 +-
 .../ApexReadUnboundedInputOperator.java         |   6 +-
 .../utils/SerializablePipelineOptions.java      |  78 ---------
 .../translation/utils/PipelineOptionsTest.java  | 150 -----------------
 runners/core-construction-java/pom.xml          |  15 ++
 .../SerializablePipelineOptions.java            |  74 +++++++++
 .../SerializablePipelineOptionsTest.java        |  89 ++++++++++
 runners/flink/pom.xml                           |  10 --
 .../functions/FlinkDoFnFunction.java            |  10 +-
 .../FlinkMergingNonShuffleReduceFunction.java   |   8 +-
 .../functions/FlinkPartialReduceFunction.java   |   8 +-
 .../functions/FlinkReduceFunction.java          |   8 +-
 .../functions/FlinkStatefulDoFnFunction.java    |  10 +-
 .../utils/SerializedPipelineOptions.java        |  77 ---------
 .../translation/wrappers/SourceInputFormat.java |  10 +-
 .../wrappers/streaming/DoFnOperator.java        |  10 +-
 .../streaming/SplittableDoFnOperator.java       |   2 +-
 .../streaming/io/BoundedSourceWrapper.java      |  10 +-
 .../streaming/io/UnboundedSourceWrapper.java    |  12 +-
 .../beam/runners/flink/PipelineOptionsTest.java | 165 +------------------
 runners/spark/pom.xml                           |  12 --
 .../spark/aggregators/NamedAggregators.java     |  93 -----------
 .../beam/runners/spark/io/SourceDStream.java    |  20 +--
 .../apache/beam/runners/spark/io/SourceRDD.java |  22 +--
 .../runners/spark/io/SparkUnboundedSource.java  |   6 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |  10 +-
 .../spark/stateful/StateSpecFunctions.java      |   8 +-
 .../spark/translation/EvaluationContext.java    |  11 +-
 .../spark/translation/MultiDoFnFunction.java    |  16 +-
 .../translation/SparkAbstractCombineFn.java     |   9 +-
 .../spark/translation/SparkGlobalCombineFn.java |   5 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   9 +-
 .../spark/translation/SparkKeyedCombineFn.java  |   5 +-
 .../spark/translation/SparkRuntimeContext.java  |  90 ----------
 .../spark/translation/TransformTranslator.java  |  27 ++-
 .../streaming/StreamingTransformTranslator.java |  20 +--
 .../translation/SparkRuntimeContextTest.java    | 122 --------------
 .../beam/sdk/options/PipelineOptions.java       |   7 +-
 .../apache/beam/sdk/options/ValueProviders.java |   8 +-
 41 files changed, 327 insertions(+), 945 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index fd5aafb..96aac8b 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -63,14 +63,6 @@
       <version>${apex.malhar.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-    <dependency>
-       <groupId>com.fasterxml.jackson.core</groupId>
-       <artifactId>jackson-databind</artifactId>
-    </dependency>
-    <dependency>
       <groupId>org.apache.apex</groupId>
       <artifactId>apex-engine</artifactId>
       <version>${apex.core.version}</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 39f681f..5c0d72f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -33,7 +33,6 @@ import java.util.Collections;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
-import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
@@ -41,6 +40,7 @@ import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
@@ -149,7 +149,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator,
 
   @Override
   public void setup(OperatorContext context) {
-    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
+    this.traceTuples =
+        ApexStreamTuple.Logging.isDebugEnabled(
+            serializedOptions.get().as(ApexPipelineOptions.class), this);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index c3cbab2..4dc807d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -40,7 +40,6 @@ import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.NoOpStepContext;
-import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.StateInternalsProxy;
 import org.apache.beam.runners.apex.translation.utils.ValueAndCoderKryoSerializable;
 import org.apache.beam.runners.core.DoFnRunner;
@@ -64,6 +63,7 @@ import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.TimerInternalsFactory;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.ListCoder;
@@ -386,7 +386,9 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements
 
   @Override
   public void setup(OperatorContext context) {
-    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+    this.traceTuples =
+        ApexStreamTuple.Logging.isDebugEnabled(
+            pipelineOptions.get().as(ApexPipelineOptions.class), this);
     SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
     if (!sideInputs.isEmpty()) {
       sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
index 1549560..21fb9d2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexReadUnboundedInputOperator.java
@@ -30,8 +30,8 @@ import java.io.IOException;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple.DataTuple;
-import org.apache.beam.runners.apex.translation.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ValuesSource;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -119,7 +119,9 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
 
   @Override
   public void setup(OperatorContext context) {
-    this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+    this.traceTuples =
+        ApexStreamTuple.Logging.isDebugEnabled(
+            pipelineOptions.get().as(ApexPipelineOptions.class), this);
     try {
       reader = source.createReader(this.pipelineOptions.get(), null);
       available = reader.start();

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
deleted file mode 100644
index 46b04fc..0000000
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/SerializablePipelineOptions.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation.utils;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-
-/**
- * A wrapper to enable serialization of {@link PipelineOptions}.
- */
-public class SerializablePipelineOptions implements Externalizable {
-
-  /* Used to ensure we initialize file systems exactly once, because it's a slow operation. */
-  private static final AtomicBoolean FILE_SYSTEMS_INTIIALIZED = new AtomicBoolean(false);
-
-  private transient ApexPipelineOptions pipelineOptions;
-
-  public SerializablePipelineOptions(ApexPipelineOptions pipelineOptions) {
-    this.pipelineOptions = pipelineOptions;
-  }
-
-  public SerializablePipelineOptions() {
-  }
-
-  public ApexPipelineOptions get() {
-    return this.pipelineOptions;
-  }
-
-  @Override
-  public void writeExternal(ObjectOutput out) throws IOException {
-    out.writeUTF(createMapper().writeValueAsString(pipelineOptions));
-  }
-
-  @Override
-  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-    String s = in.readUTF();
-    this.pipelineOptions = createMapper().readValue(s, PipelineOptions.class)
-        .as(ApexPipelineOptions.class);
-
-    if (FILE_SYSTEMS_INTIIALIZED.compareAndSet(false, true)) {
-      FileSystems.setDefaultPipelineOptions(pipelineOptions);
-    }
-  }
-
-  /**
-   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
-   * for user specified configuration injection into the ObjectMapper. This supports user custom
-   * types on {@link PipelineOptions}.
-   */
-  private static ObjectMapper createMapper() {
-    return new ObjectMapper().registerModules(
-        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
deleted file mode 100644
index 118ff99..0000000
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/PipelineOptionsTest.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translation.utils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import com.datatorrent.common.util.FSStorageAgent;
-import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.auto.service.AutoService;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.junit.Test;
-
-/**
- * Tests the serialization of PipelineOptions.
- */
-public class PipelineOptionsTest {
-
-  /**
-   * Interface for testing.
-   */
-  public interface MyOptions extends ApexPipelineOptions {
-    @Description("Bla bla bla")
-    @Default.String("Hello")
-    String getTestOption();
-    void setTestOption(String value);
-  }
-
-  private static class OptionsWrapper {
-    private OptionsWrapper() {
-      this(null); // required for Kryo
-    }
-    private OptionsWrapper(ApexPipelineOptions options) {
-      this.options = new SerializablePipelineOptions(options);
-    }
-    @Bind(JavaSerializer.class)
-    private final SerializablePipelineOptions options;
-  }
-
-  @Test
-  public void testSerialization() {
-    OptionsWrapper wrapper = new OptionsWrapper(
-        PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class));
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    FSStorageAgent.store(bos, wrapper);
-
-    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-    OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis);
-    assertNotNull(wrapperCopy.options);
-    assertEquals("nothing", wrapperCopy.options.get().as(MyOptions.class).getTestOption());
-  }
-
-  @Test
-  public void testSerializationWithUserCustomType() {
-    OptionsWrapper wrapper = new OptionsWrapper(
-        PipelineOptionsFactory.fromArgs("--jacksonIncompatible=\"testValue\"")
-            .as(JacksonIncompatibleOptions.class));
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    FSStorageAgent.store(bos, wrapper);
-
-    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
-    OptionsWrapper wrapperCopy = (OptionsWrapper) FSStorageAgent.retrieve(bis);
-    assertNotNull(wrapperCopy.options);
-    assertEquals("testValue",
-        wrapperCopy.options.get().as(JacksonIncompatibleOptions.class)
-            .getJacksonIncompatible().value);
-  }
-
-  /** PipelineOptions used to test auto registration of Jackson modules. */
-  public interface JacksonIncompatibleOptions extends ApexPipelineOptions {
-    JacksonIncompatible getJacksonIncompatible();
-    void setJacksonIncompatible(JacksonIncompatible value);
-  }
-
-  /** A Jackson {@link Module} to test auto-registration of modules. */
-  @AutoService(Module.class)
-  public static class RegisteredTestModule extends SimpleModule {
-    public RegisteredTestModule() {
-      super("RegisteredTestModule");
-      setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
-    }
-  }
-
-  /** A class which Jackson does not know how to serialize/deserialize. */
-  public static class JacksonIncompatible {
-    private final String value;
-    public JacksonIncompatible(String value) {
-      this.value = value;
-    }
-  }
-
-  /** A Jackson mixin used to add annotations to other classes. */
-  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
-  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
-  public static final class JacksonIncompatibleMixin {}
-
-  /** A Jackson deserializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleDeserializer extends
-      JsonDeserializer<JacksonIncompatible> {
-
-    @Override
-    public JacksonIncompatible deserialize(JsonParser jsonParser,
-        DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
-    }
-  }
-
-  /** A Jackson serializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
-
-    @Override
-    public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
-        SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
-      jsonGenerator.writeString(jacksonIncompatible.value);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index b85b5f5..1a52914 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -65,6 +65,21 @@
     </dependency>
 
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java
new file mode 100644
index 0000000..e697fb2
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SerializablePipelineOptions.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+/**
+ * Holds a {@link PipelineOptions} in JSON serialized form and calls {@link
+ * FileSystems#setDefaultPipelineOptions(PipelineOptions)} on construction or on deserialization.
+ */
+public class SerializablePipelineOptions implements Serializable {
+  private static final ObjectMapper MAPPER =
+      new ObjectMapper()
+          .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
+  private final String serializedPipelineOptions;
+  private transient PipelineOptions options;
+
+  public SerializablePipelineOptions(PipelineOptions options) {
+    this.serializedPipelineOptions = serializeToJson(options);
+    this.options = options;
+    FileSystems.setDefaultPipelineOptions(options);
+  }
+
+  public PipelineOptions get() {
+    return options;
+  }
+
+  private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
+    is.defaultReadObject();
+    this.options = deserializeFromJson(serializedPipelineOptions);
+    // TODO https://issues.apache.org/jira/browse/BEAM-2712: remove this call.
+    FileSystems.setDefaultPipelineOptions(options);
+  }
+
+  private static String serializeToJson(PipelineOptions options) {
+    try {
+      return MAPPER.writeValueAsString(options);
+    } catch (JsonProcessingException e) {
+      throw new IllegalArgumentException("Failed to serialize PipelineOptions", e);
+    }
+  }
+
+  private static PipelineOptions deserializeFromJson(String options) {
+    try {
+      return MAPPER.readValue(options, PipelineOptions.class);
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to deserialize PipelineOptions", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java
new file mode 100644
index 0000000..cd470b2
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SerializablePipelineOptionsTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertEquals;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SerializablePipelineOptions}. */
+@RunWith(JUnit4.class)
+public class SerializablePipelineOptionsTest {
+  /** Options for testing. */
+  public interface MyOptions extends PipelineOptions {
+    String getFoo();
+
+    void setFoo(String foo);
+
+    @JsonIgnore
+    @Default.String("not overridden")
+    String getIgnoredField();
+
+    void setIgnoredField(String value);
+  }
+
+  @Test
+  public void testSerializationAndDeserialization() throws Exception {
+    PipelineOptions options =
+        PipelineOptionsFactory.fromArgs("--foo=testValue", "--ignoredField=overridden")
+            .as(MyOptions.class);
+
+    SerializablePipelineOptions serializableOptions = new SerializablePipelineOptions(options);
+    assertEquals("testValue", serializableOptions.get().as(MyOptions.class).getFoo());
+    assertEquals("overridden", serializableOptions.get().as(MyOptions.class).getIgnoredField());
+
+    SerializablePipelineOptions copy = SerializableUtils.clone(serializableOptions);
+    assertEquals("testValue", copy.get().as(MyOptions.class).getFoo());
+    assertEquals("not overridden", copy.get().as(MyOptions.class).getIgnoredField());
+  }
+
+  @Test
+  public void testIndependence() throws Exception {
+    SerializablePipelineOptions first =
+        new SerializablePipelineOptions(
+            PipelineOptionsFactory.fromArgs("--foo=first").as(MyOptions.class));
+    SerializablePipelineOptions firstCopy = SerializableUtils.clone(first);
+    SerializablePipelineOptions second =
+        new SerializablePipelineOptions(
+            PipelineOptionsFactory.fromArgs("--foo=second").as(MyOptions.class));
+    SerializablePipelineOptions secondCopy = SerializableUtils.clone(second);
+
+    assertEquals("first", first.get().as(MyOptions.class).getFoo());
+    assertEquals("first", firstCopy.get().as(MyOptions.class).getFoo());
+    assertEquals("second", second.get().as(MyOptions.class).getFoo());
+    assertEquals("second", secondCopy.get().as(MyOptions.class).getFoo());
+
+    first.get().as(MyOptions.class).setFoo("new first");
+    firstCopy.get().as(MyOptions.class).setFoo("new firstCopy");
+    second.get().as(MyOptions.class).setFoo("new second");
+    secondCopy.get().as(MyOptions.class).setFoo("new secondCopy");
+
+    assertEquals("new first", first.get().as(MyOptions.class).getFoo());
+    assertEquals("new firstCopy", firstCopy.get().as(MyOptions.class).getFoo());
+    assertEquals("new second", second.get().as(MyOptions.class).getFoo());
+    assertEquals("new secondCopy", secondCopy.get().as(MyOptions.class).getFoo());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index c063a2d..06746fd 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -256,16 +256,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index d8ed622..3048168 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -22,9 +22,9 @@ import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
@@ -50,7 +50,7 @@ import org.apache.flink.util.Collector;
 public class FlinkDoFnFunction<InputT, OutputT>
     extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
 
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   private final DoFn<InputT, OutputT> doFn;
   private final String stepName;
@@ -75,7 +75,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
     this.doFn = doFn;
     this.stepName = stepName;
     this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.serializedOptions = new SerializablePipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
     this.outputMap = outputMap;
     this.mainOutputTag = mainOutputTag;
@@ -101,7 +101,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
     List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
 
     DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
-        serializedOptions.getPipelineOptions(), doFn,
+        serializedOptions.get(), doFn,
         new FlinkSideInputReader(sideInputs, runtimeContext),
         outputManager,
         mainOutputTag,
@@ -109,7 +109,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
         new FlinkNoOpStepContext(),
         windowingStrategy);
 
-    if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+    if ((serializedOptions.get().as(FlinkPipelineOptions.class))
         .getEnableMetrics()) {
       doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index 13be913..c73dade 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -47,7 +47,7 @@ public class FlinkMergingNonShuffleReduceFunction<
 
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   public FlinkMergingNonShuffleReduceFunction(
       CombineFnBase.GlobalCombineFn<InputT, AccumT, OutputT> combineFn,
@@ -60,7 +60,7 @@ public class FlinkMergingNonShuffleReduceFunction<
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
 
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
 
   }
 
@@ -69,7 +69,7 @@ public class FlinkMergingNonShuffleReduceFunction<
       Iterable<WindowedValue<KV<K, InputT>>> elements,
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
-    PipelineOptions options = serializedOptions.getPipelineOptions();
+    PipelineOptions options = serializedOptions.get();
 
     FlinkSideInputReader sideInputReader =
         new FlinkSideInputReader(sideInputs, getRuntimeContext());

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index db12a49..49e821c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -46,7 +46,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
 
   protected final WindowingStrategy<Object, W> windowingStrategy;
 
-  protected final SerializedPipelineOptions serializedOptions;
+  protected final SerializablePipelineOptions serializedOptions;
 
   protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
@@ -59,7 +59,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
     this.combineFn = combineFn;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
 
   }
 
@@ -68,7 +68,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
       Iterable<WindowedValue<KV<K, InputT>>> elements,
       Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
 
-    PipelineOptions options = serializedOptions.getPipelineOptions();
+    PipelineOptions options = serializedOptions.get();
 
     FlinkSideInputReader sideInputReader =
         new FlinkSideInputReader(sideInputs, getRuntimeContext());

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index 53d71d8..6645b3a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.CombineFnBase;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -48,7 +48,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
 
   protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
-  protected final SerializedPipelineOptions serializedOptions;
+  protected final SerializablePipelineOptions serializedOptions;
 
   public FlinkReduceFunction(
       CombineFnBase.GlobalCombineFn<?, AccumT, OutputT> combineFn,
@@ -61,7 +61,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
 
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
 
   }
 
@@ -70,7 +70,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
       Iterable<WindowedValue<KV<K, AccumT>>> elements,
       Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
 
-    PipelineOptions options = serializedOptions.getPipelineOptions();
+    PipelineOptions options = serializedOptions.get();
 
     FlinkSideInputReader sideInputReader =
         new FlinkSideInputReader(sideInputs, getRuntimeContext());

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 11d4fee..412269c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -31,9 +31,9 @@ import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -61,7 +61,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
   private String stepName;
   private final WindowingStrategy<?, ?> windowingStrategy;
   private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
   private final Map<TupleTag<?>, Integer> outputMap;
   private final TupleTag<OutputT> mainOutputTag;
   private transient DoFnInvoker doFnInvoker;
@@ -79,7 +79,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     this.stepName = stepName;
     this.windowingStrategy = windowingStrategy;
     this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
     this.outputMap = outputMap;
     this.mainOutputTag = mainOutputTag;
   }
@@ -118,7 +118,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
 
     DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
-        serializedOptions.getPipelineOptions(), dofn,
+        serializedOptions.get(), dofn,
         new FlinkSideInputReader(sideInputs, runtimeContext),
         outputManager,
         mainOutputTag,
@@ -135,7 +135,7 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
         },
         windowingStrategy);
 
-    if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+    if ((serializedOptions.get().as(FlinkPipelineOptions.class))
         .getEnableMetrics()) {
       doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
deleted file mode 100644
index 40b6dd6..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.flink.translation.utils;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.io.FileSystems;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
-
-/**
- * Encapsulates the PipelineOptions in serialized form to ship them to the cluster.
- */
-public class SerializedPipelineOptions implements Serializable {
-
-  private final byte[] serializedOptions;
-
-  /** Lazily initialized copy of deserialized options. */
-  private transient PipelineOptions pipelineOptions;
-
-  public SerializedPipelineOptions(PipelineOptions options) {
-    checkNotNull(options, "PipelineOptions must not be null.");
-
-    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-      createMapper().writeValue(baos, options);
-      this.serializedOptions = baos.toByteArray();
-    } catch (Exception e) {
-      throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
-    }
-
-  }
-
-  public PipelineOptions getPipelineOptions() {
-    if (pipelineOptions == null) {
-      try {
-        pipelineOptions = createMapper().readValue(serializedOptions, PipelineOptions.class);
-
-        FileSystems.setDefaultPipelineOptions(pipelineOptions);
-      } catch (IOException e) {
-        throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
-      }
-    }
-
-    return pipelineOptions;
-  }
-
-  /**
-   * Use an {@link ObjectMapper} configured with any {@link Module}s in the class path allowing
-   * for user specified configuration injection into the ObjectMapper. This supports user custom
-   * types on {@link PipelineOptions}.
-   */
-  private static ObjectMapper createMapper() {
-    return new ObjectMapper().registerModules(
-        ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 27e6912..3f9d601 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -19,9 +19,9 @@ package org.apache.beam.runners.flink.translation.wrappers;
 
 import java.io.IOException;
 import java.util.List;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -50,7 +50,7 @@ public class SourceInputFormat<T>
   private final BoundedSource<T> initialSource;
 
   private transient PipelineOptions options;
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   private transient BoundedSource.BoundedReader<T> reader;
   private boolean inputAvailable = false;
@@ -61,12 +61,12 @@ public class SourceInputFormat<T>
       String stepName, BoundedSource<T> initialSource, PipelineOptions options) {
     this.stepName = stepName;
     this.initialSource = initialSource;
-    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.serializedOptions = new SerializablePipelineOptions(options);
   }
 
   @Override
   public void configure(Configuration configuration) {
-    options = serializedOptions.getPipelineOptions();
+    options = serializedOptions.get();
   }
 
   @Override
@@ -76,7 +76,7 @@ public class SourceInputFormat<T>
     readerInvoker =
         new ReaderInvocationUtil<>(
             stepName,
-            serializedOptions.getPipelineOptions(),
+            serializedOptions.get(),
             metricContainer);
 
     reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 7995ea8..62de423 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -47,10 +47,10 @@ import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
@@ -106,7 +106,7 @@ public class DoFnOperator<InputT, OutputT>
 
   protected DoFn<InputT, OutputT> doFn;
 
-  protected final SerializedPipelineOptions serializedOptions;
+  protected final SerializablePipelineOptions serializedOptions;
 
   protected final TupleTag<OutputT> mainOutputTag;
   protected final List<TupleTag<?>> additionalOutputTags;
@@ -174,7 +174,7 @@ public class DoFnOperator<InputT, OutputT>
     this.additionalOutputTags = additionalOutputTags;
     this.sideInputTagMapping = sideInputTagMapping;
     this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.serializedOptions = new SerializablePipelineOptions(options);
     this.windowingStrategy = windowingStrategy;
     this.outputManagerFactory = outputManagerFactory;
 
@@ -256,7 +256,7 @@ public class DoFnOperator<InputT, OutputT>
     org.apache.beam.runners.core.StepContext stepContext = createStepContext();
 
     doFnRunner = DoFnRunners.simpleRunner(
-        serializedOptions.getPipelineOptions(),
+        serializedOptions.get(),
         doFn,
         sideInputReader,
         outputManager,
@@ -301,7 +301,7 @@ public class DoFnOperator<InputT, OutputT>
           stateCleaner);
     }
 
-    if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class))
+    if ((serializedOptions.get().as(FlinkPipelineOptions.class))
         .getEnableMetrics()) {
       doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index 2f095d4..be758a6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -115,7 +115,7 @@ public class SplittableDoFnOperator<
     ((ProcessFn) doFn).setProcessElementInvoker(
         new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
             doFn,
-            serializedOptions.getPipelineOptions(),
+            serializedOptions.get(),
             new OutputWindowedValue<OutputT>() {
               @Override
               public void outputWindowedValue(

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index 6d75688..5ddc46f 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -20,9 +20,9 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -48,7 +48,7 @@ public class BoundedSourceWrapper<OutputT>
   /**
    * Keep the options so that we can initialize the readers.
    */
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   /**
    * The split sources. We split them in the constructor to ensure that all parallel
@@ -74,7 +74,7 @@ public class BoundedSourceWrapper<OutputT>
       BoundedSource<OutputT> source,
       int parallelism) throws Exception {
     this.stepName = stepName;
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
 
     long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
 
@@ -109,13 +109,13 @@ public class BoundedSourceWrapper<OutputT>
     ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker =
         new ReaderInvocationUtil<>(
             stepName,
-            serializedOptions.getPipelineOptions(),
+            serializedOptions.get(),
             metricContainer);
 
     readers = new ArrayList<>();
     // initialize readers from scratch
     for (BoundedSource<OutputT> source : localSources) {
-      readers.add(source.createReader(serializedOptions.getPipelineOptions()));
+      readers.add(source.createReader(serializedOptions.get()));
     }
 
    if (readers.size() == 1) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index e75072a..817dd74 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
 import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
@@ -72,7 +72,7 @@ public class UnboundedSourceWrapper<
   /**
    * Keep the options so that we can initialize the localReaders.
    */
-  private final SerializedPipelineOptions serializedOptions;
+  private final SerializablePipelineOptions serializedOptions;
 
   /**
    * For snapshot and restore.
@@ -141,7 +141,7 @@ public class UnboundedSourceWrapper<
       UnboundedSource<OutputT, CheckpointMarkT> source,
       int parallelism) throws Exception {
     this.stepName = stepName;
-    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
 
     if (source.requiresDeduping()) {
       LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source);
@@ -189,7 +189,7 @@ public class UnboundedSourceWrapper<
           stateForCheckpoint.get()) {
         localSplitSources.add(restored.getKey());
         localReaders.add(restored.getKey().createReader(
-            serializedOptions.getPipelineOptions(), restored.getValue()));
+            serializedOptions.get(), restored.getValue()));
       }
     } else {
       // initialize localReaders and localSources from scratch
@@ -198,7 +198,7 @@ public class UnboundedSourceWrapper<
           UnboundedSource<OutputT, CheckpointMarkT> source =
               splitSources.get(i);
           UnboundedSource.UnboundedReader<OutputT> reader =
-              source.createReader(serializedOptions.getPipelineOptions(), null);
+              source.createReader(serializedOptions.get(), null);
           localSplitSources.add(source);
           localReaders.add(reader);
         }
@@ -221,7 +221,7 @@ public class UnboundedSourceWrapper<
     ReaderInvocationUtil<OutputT, UnboundedSource.UnboundedReader<OutputT>> readerInvoker =
         new ReaderInvocationUtil<>(
             stepName,
-            serializedOptions.getPipelineOptions(),
+            serializedOptions.get(),
             metricContainer);
 
     if (localReaders.size() == 0) {

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
index d0281ec..eb06026 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -17,32 +17,8 @@
  */
 package org.apache.beam.runners.flink;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.auto.service.AutoService;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.Collections;
 import java.util.HashMap;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.Default;
@@ -60,12 +36,10 @@ import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.joda.time.Instant;
 import org.junit.Assert;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 /**
@@ -73,9 +47,7 @@ import org.junit.Test;
  */
 public class PipelineOptionsTest {
 
-  /**
-   * Pipeline options.
-   */
+  /** Pipeline options. */
   public interface MyOptions extends FlinkPipelineOptions {
     @Description("Bla bla bla")
     @Default.String("Hello")
@@ -83,60 +55,12 @@ public class PipelineOptionsTest {
     void setTestOption(String value);
   }
 
-  private static MyOptions options;
-  private static SerializedPipelineOptions serializedOptions;
-
-  private static final String[] args = new String[]{"--testOption=nothing"};
-
-  @BeforeClass
-  public static void beforeTest() {
-    options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
-    serializedOptions = new SerializedPipelineOptions(options);
-  }
-
-  @Test
-  public void testDeserialization() {
-    MyOptions deserializedOptions = serializedOptions.getPipelineOptions().as(MyOptions.class);
-    assertEquals("nothing", deserializedOptions.getTestOption());
-  }
-
-  @Test
-  public void testIgnoredFieldSerialization() {
-    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-    options.setStateBackend(new MemoryStateBackend());
-
-    FlinkPipelineOptions deserialized =
-        new SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class);
-
-    assertNull(deserialized.getStateBackend());
-  }
-
-  @Test
-  public void testEnableMetrics() {
-    FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
-    options.setEnableMetrics(false);
-    assertFalse(options.getEnableMetrics());
-  }
-
-  @Test
-  public void testCaching() {
-    PipelineOptions deserializedOptions =
-        serializedOptions.getPipelineOptions().as(PipelineOptions.class);
-
-    assertNotNull(deserializedOptions);
-    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
-    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
-    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
-  }
-
-  @Test(expected = Exception.class)
-  public void testNonNull() {
-    new SerializedPipelineOptions(null);
-  }
+  private static MyOptions options =
+      PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class);
 
   @Test(expected = Exception.class)
   public void parDoBaseClassPipelineOptionsNullTest() {
-    DoFnOperator<String, String> doFnOperator = new DoFnOperator<>(
+    new DoFnOperator<>(
         new TestDoFn(),
         "stepName",
         WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
@@ -196,18 +120,7 @@ public class PipelineOptionsTest {
 
   }
 
-  @Test
-  public void testExternalizedCheckpointsConfigs() {
-    String[] args = new String[] { "--externalizedCheckpointsEnabled=true",
-        "--retainExternalizedCheckpointsOnCancellation=false" };
-    final FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
-        .as(FlinkPipelineOptions.class);
-    assertEquals(options.isExternalizedCheckpointsEnabled(), true);
-    assertEquals(options.getRetainExternalizedCheckpointsOnCancellation(), false);
-  }
-
   private static class TestDoFn extends DoFn<String, String> {
-
     @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       Assert.assertNotNull(c.getPipelineOptions());
@@ -216,74 +129,4 @@ public class PipelineOptionsTest {
           c.getPipelineOptions().as(MyOptions.class).getTestOption());
     }
   }
-
-  /** PipelineOptions used to test auto registration of Jackson modules. */
-  public interface JacksonIncompatibleOptions extends PipelineOptions {
-    JacksonIncompatible getJacksonIncompatible();
-    void setJacksonIncompatible(JacksonIncompatible value);
-  }
-
-  /** A Jackson {@link Module} to test auto-registration of modules. */
-  @AutoService(Module.class)
-  public static class RegisteredTestModule extends SimpleModule {
-    public RegisteredTestModule() {
-      super("RegisteredTestModule");
-      setMixInAnnotation(JacksonIncompatible.class, JacksonIncompatibleMixin.class);
-    }
-  }
-
-  /** A class which Jackson does not know how to serialize/deserialize. */
-  public static class JacksonIncompatible {
-    private final String value;
-    public JacksonIncompatible(String value) {
-      this.value = value;
-    }
-  }
-
-  /** A Jackson mixin used to add annotations to other classes. */
-  @JsonDeserialize(using = JacksonIncompatibleDeserializer.class)
-  @JsonSerialize(using = JacksonIncompatibleSerializer.class)
-  public static final class JacksonIncompatibleMixin {}
-
-  /** A Jackson deserializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleDeserializer extends
-      JsonDeserializer<JacksonIncompatible> {
-
-    @Override
-    public JacksonIncompatible deserialize(JsonParser jsonParser,
-        DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
-      return new JacksonIncompatible(jsonParser.readValueAs(String.class));
-    }
-  }
-
-  /** A Jackson serializer for {@link JacksonIncompatible}. */
-  public static class JacksonIncompatibleSerializer extends JsonSerializer<JacksonIncompatible> {
-
-    @Override
-    public void serialize(JacksonIncompatible jacksonIncompatible, JsonGenerator jsonGenerator,
-        SerializerProvider serializerProvider) throws IOException, JsonProcessingException {
-      jsonGenerator.writeString(jacksonIncompatible.value);
-    }
-  }
-
-  @Test
-  public void testSerializingPipelineOptionsWithCustomUserType() throws Exception {
-    String expectedValue = "testValue";
-    PipelineOptions options = PipelineOptionsFactory
-        .fromArgs("--jacksonIncompatible=\"" + expectedValue + "\"")
-        .as(JacksonIncompatibleOptions.class);
-    SerializedPipelineOptions context = new SerializedPipelineOptions(options);
-
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try (ObjectOutputStream outputStream = new ObjectOutputStream(baos)) {
-      outputStream.writeObject(context);
-    }
-    try (ObjectInputStream inputStream =
-        new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()))) {
-      SerializedPipelineOptions copy = (SerializedPipelineOptions) inputStream.readObject();
-      assertEquals(expectedValue,
-          copy.getPipelineOptions().as(JacksonIncompatibleOptions.class)
-              .getJacksonIncompatible().value);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index e823060..b2e7fe4 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -35,7 +35,6 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <kafka.version>0.9.0.1</kafka.version>
-    <jackson.version>2.4.4</jackson.version>
     <dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
   </properties>
 
@@ -184,18 +183,7 @@
     </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-core</artifactId>
-      <version>${jackson.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
-      <version>${jackson.version}</version>
-    </dependency>
-      <dependency>
-      <groupId>com.fasterxml.jackson.core</groupId>
-      <artifactId>jackson-databind</artifactId>
-        <version>${jackson.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.avro</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index 27f2ec8..a9f2c44 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -19,18 +19,11 @@
 package org.apache.beam.runners.spark.aggregators;
 
 import com.google.common.base.Function;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 
 /**
@@ -52,17 +45,6 @@ public class NamedAggregators implements Serializable {
   }
 
   /**
-   * Constructs a new named aggregators instance that contains a mapping from the specified
-   * `named` to the associated initial state.
-   *
-   * @param name  Name of aggregator.
-   * @param state Associated State.
-   */
-  public NamedAggregators(String name, State<?, ?, ?> state) {
-    this.mNamedAggregators.put(name, state);
-  }
-
-  /**
    * @param name      Name of aggregator to retrieve.
    * @param typeClass Type class to cast the value to.
    * @param <T>       Type to be returned.
@@ -152,79 +134,4 @@ public class NamedAggregators implements Serializable {
     Combine.CombineFn<InputT, InterT, OutputT> getCombineFn();
   }
 
-  /**
-   * @param <InputT> Input data type
-   * @param <InterT> Intermediate data type (useful for averages)
-   * @param <OutputT> Output data type
-   */
-  public static class CombineFunctionState<InputT, InterT, OutputT>
-      implements State<InputT, InterT, OutputT> {
-
-    private Combine.CombineFn<InputT, InterT, OutputT> combineFn;
-    private Coder<InputT> inCoder;
-    private SparkRuntimeContext ctxt;
-    private transient InterT state;
-
-    public CombineFunctionState(
-        Combine.CombineFn<InputT, InterT, OutputT> combineFn,
-        Coder<InputT> inCoder,
-        SparkRuntimeContext ctxt) {
-      this.combineFn = combineFn;
-      this.inCoder = inCoder;
-      this.ctxt = ctxt;
-      this.state = combineFn.createAccumulator();
-    }
-
-    @Override
-    public void update(InputT element) {
-      combineFn.addInput(state, element);
-    }
-
-    @Override
-    public State<InputT, InterT, OutputT> merge(State<InputT, InterT, OutputT> other) {
-      this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), other.current()));
-      return this;
-    }
-
-    @Override
-    public InterT current() {
-      return state;
-    }
-
-    @Override
-    public OutputT render() {
-      return combineFn.extractOutput(state);
-    }
-
-    @Override
-    public Combine.CombineFn<InputT, InterT, OutputT> getCombineFn() {
-      return combineFn;
-    }
-
-    private void writeObject(ObjectOutputStream oos) throws IOException {
-      oos.writeObject(ctxt);
-      oos.writeObject(combineFn);
-      oos.writeObject(inCoder);
-      try {
-        combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
-            .encode(state, oos);
-      } catch (CannotProvideCoderException e) {
-        throw new IllegalStateException("Could not determine coder for accumulator", e);
-      }
-    }
-
-    @SuppressWarnings("unchecked")
-    private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-      ctxt = (SparkRuntimeContext) ois.readObject();
-      combineFn = (Combine.CombineFn<InputT, InterT, OutputT>) ois.readObject();
-      inCoder = (Coder<InputT>) ois.readObject();
-      try {
-        state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)
-            .decode(ois);
-      } catch (CannotProvideCoderException e) {
-        throw new IllegalStateException("Could not determine coder for accumulator", e);
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index 20aca5f..b7000b4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -20,8 +20,8 @@ package org.apache.beam.runners.spark.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.spark.api.java.JavaSparkContext$;
@@ -58,7 +58,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
   private static final Logger LOG = LoggerFactory.getLogger(SourceDStream.class);
 
   private final UnboundedSource<T, CheckpointMarkT> unboundedSource;
-  private final SparkRuntimeContext runtimeContext;
+  private final SerializablePipelineOptions options;
   private final Duration boundReadDuration;
   // Reader cache interval to expire readers if they haven't been accessed in the last microbatch.
   // The reason we expire readers is that upon executor death/addition source split ownership can be
@@ -81,20 +81,20 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
   SourceDStream(
       StreamingContext ssc,
       UnboundedSource<T, CheckpointMarkT> unboundedSource,
-      SparkRuntimeContext runtimeContext,
+      SerializablePipelineOptions options,
       Long boundMaxRecords) {
     super(ssc, JavaSparkContext$.MODULE$.<scala.Tuple2<Source<T>, CheckpointMarkT>>fakeClassTag());
     this.unboundedSource = unboundedSource;
-    this.runtimeContext = runtimeContext;
+    this.options = options;
 
-    SparkPipelineOptions options = runtimeContext.getPipelineOptions().as(
+    SparkPipelineOptions sparkOptions = options.get().as(
         SparkPipelineOptions.class);
 
     // Reader cache expiration interval. 50% of batch interval is added to accommodate latency.
-    this.readerCacheInterval = 1.5 * options.getBatchIntervalMillis();
+    this.readerCacheInterval = 1.5 * sparkOptions.getBatchIntervalMillis();
 
-    this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(),
-        options.getMinReadTimeMillis());
+    this.boundReadDuration = boundReadDuration(sparkOptions.getReadTimePercentage(),
+        sparkOptions.getMinReadTimeMillis());
     // set initial parallelism once.
     this.initialParallelism = ssc().sparkContext().defaultParallelism();
     checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than zero.");
@@ -104,7 +104,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     try {
       this.numPartitions =
           createMicrobatchSource()
-              .split(options)
+              .split(sparkOptions)
               .size();
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -116,7 +116,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd =
         new SourceRDD.Unbounded<>(
             ssc().sparkContext(),
-            runtimeContext,
+            options,
             createMicrobatchSource(),
             numPartitions);
     return scala.Option.apply(rdd);

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index 01cc176..a225e0f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -28,9 +28,9 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -66,7 +66,7 @@ public class SourceRDD {
     private static final Logger LOG = LoggerFactory.getLogger(SourceRDD.Bounded.class);
 
     private final BoundedSource<T> source;
-    private final SparkRuntimeContext runtimeContext;
+    private final SerializablePipelineOptions options;
     private final int numPartitions;
     private final String stepName;
     private final Accumulator<MetricsContainerStepMap> metricsAccum;
@@ -79,11 +79,11 @@ public class SourceRDD {
     public Bounded(
         SparkContext sc,
         BoundedSource<T> source,
-        SparkRuntimeContext runtimeContext,
+        SerializablePipelineOptions options,
         String stepName) {
       super(sc, NIL, JavaSparkContext$.MODULE$.<WindowedValue<T>>fakeClassTag());
       this.source = source;
-      this.runtimeContext = runtimeContext;
+      this.options = options;
       // the input parallelism is determined by Spark's scheduler backend.
       // when running on YARN/SparkDeploy it's the result of max(totalCores, 2).
       // when running on Mesos it's 8.
@@ -103,14 +103,14 @@ public class SourceRDD {
       long desiredSizeBytes = DEFAULT_BUNDLE_SIZE;
       try {
         desiredSizeBytes = source.getEstimatedSizeBytes(
-            runtimeContext.getPipelineOptions()) / numPartitions;
+            options.get()) / numPartitions;
       } catch (Exception e) {
         LOG.warn("Failed to get estimated bundle size for source {}, using default bundle "
             + "size of {} bytes.", source, DEFAULT_BUNDLE_SIZE);
       }
       try {
         List<? extends Source<T>> partitionedSources = source.split(desiredSizeBytes,
-            runtimeContext.getPipelineOptions());
+            options.get());
         Partition[] partitions = new SourcePartition[partitionedSources.size()];
         for (int i = 0; i < partitionedSources.size(); i++) {
           partitions[i] = new SourcePartition<>(id(), i, partitionedSources.get(i));
@@ -125,7 +125,7 @@ public class SourceRDD {
     private BoundedSource.BoundedReader<T> createReader(SourcePartition<T> partition) {
       try {
         return ((BoundedSource<T>) partition.source).createReader(
-            runtimeContext.getPipelineOptions());
+            options.get());
       } catch (IOException e) {
         throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
       }
@@ -293,7 +293,7 @@ public class SourceRDD {
         UnboundedSource.CheckpointMark> extends RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> {
 
     private final MicrobatchSource<T, CheckpointMarkT> microbatchSource;
-    private final SparkRuntimeContext runtimeContext;
+    private final SerializablePipelineOptions options;
     private final Partitioner partitioner;
 
     // to satisfy Scala API.
@@ -302,12 +302,12 @@ public class SourceRDD {
             .asScalaBuffer(Collections.<Dependency<?>>emptyList()).toList();
 
     public Unbounded(SparkContext sc,
-        SparkRuntimeContext runtimeContext,
+        SerializablePipelineOptions options,
         MicrobatchSource<T, CheckpointMarkT> microbatchSource,
         int initialNumPartitions) {
       super(sc, NIL,
           JavaSparkContext$.MODULE$.<scala.Tuple2<Source<T>, CheckpointMarkT>>fakeClassTag());
-      this.runtimeContext = runtimeContext;
+      this.options = options;
       this.microbatchSource = microbatchSource;
       this.partitioner = new HashPartitioner(initialNumPartitions);
     }
@@ -316,7 +316,7 @@ public class SourceRDD {
     public Partition[] getPartitions() {
       try {
         final List<? extends Source<T>> partitionedSources =
-            microbatchSource.split(runtimeContext.getPipelineOptions());
+            microbatchSource.split(options.get());
         final Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
         for (int i = 0; i < partitionedSources.size(); i++) {
           partitions[i] =

http://git-wip-us.apache.org/repos/asf/beam/blob/7db051ae/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index 7106c73..b31aa9f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -22,12 +22,12 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.stateful.StateSpecFunctions;
-import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
@@ -80,11 +80,11 @@ public class SparkUnboundedSource {
 
   public static <T, CheckpointMarkT extends CheckpointMark> UnboundedDataset<T> read(
       JavaStreamingContext jssc,
-      SparkRuntimeContext rc,
+      SerializablePipelineOptions rc,
       UnboundedSource<T, CheckpointMarkT> source,
       String stepName) {
 
-    SparkPipelineOptions options = rc.getPipelineOptions().as(SparkPipelineOptions.class);
+    SparkPipelineOptions options = rc.get().as(SparkPipelineOptions.class);
     Long maxRecordsPerBatch = options.getMaxRecordsPerBatch();
     SourceDStream<T, CheckpointMarkT> sourceDStream =
         new SourceDStream<>(jssc.ssc(), source, rc, maxRecordsPerBatch);


Mime
View raw message