beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: Use RehydratedComponents for memoized rehydration
Date Fri, 21 Jul 2017 18:29:58 GMT
Repository: beam
Updated Branches:
  refs/heads/master b6f126dc6 -> f870bf516


Use RehydratedComponents for memoized rehydration


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/01492e69
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/01492e69
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/01492e69

Branch: refs/heads/master
Commit: 01492e69d1b1eedb99931f7071a9dae9d60d25fe
Parents: b6f126d
Author: Kenneth Knowles <klk@google.com>
Authored: Wed Jul 19 20:58:36 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Fri Jul 21 11:05:41 2017 -0700

----------------------------------------------------------------------
 .../core/construction/CoderTranslation.java     |  14 +-
 .../core/construction/CombineTranslation.java   |   7 +-
 .../construction/PCollectionTranslation.java    |  28 +---
 .../core/construction/ParDoTranslation.java     |  48 ++----
 .../core/construction/RehydratedComponents.java | 158 +++++++++++++++++++
 .../construction/TestStreamTranslation.java     |  11 +-
 .../WindowingStrategyTranslation.java           |   6 +-
 .../core/construction/CoderTranslationTest.java |   4 +-
 .../construction/CombineTranslationTest.java    |   9 +-
 .../PCollectionTranslationTest.java             |  12 +-
 .../core/construction/ParDoTranslationTest.java |  61 +++----
 .../construction/RehydratedComponentsTest.java  |  96 +++++++++++
 .../construction/TestStreamTranslationTest.java |  11 +-
 .../WindowingStrategyTranslationTest.java       |   5 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  16 +-
 15 files changed, 355 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
index 470db6a..a6719ff 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslation.java
@@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -149,21 +148,22 @@ public class CoderTranslation {
         .build();
   }
 
-  public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components components)
+  public static Coder<?> fromProto(
+      RunnerApi.Coder protoCoder, RehydratedComponents components)
       throws IOException {
     String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn();
     if (coderSpecUrn.equals(JAVA_SERIALIZED_CODER_URN)) {
-      return fromCustomCoder(protoCoder, components);
+      return fromCustomCoder(protoCoder);
     }
     return fromKnownCoder(protoCoder, components);
   }
 
-  private static Coder<?> fromKnownCoder(RunnerApi.Coder coder, Components components)
+  private static Coder<?> fromKnownCoder(RunnerApi.Coder coder, RehydratedComponents components)
       throws IOException {
     String coderUrn = coder.getSpec().getSpec().getUrn();
     List<Coder<?>> coderComponents = new LinkedList<>();
     for (String componentId : coder.getComponentCoderIdsList()) {
-      Coder<?> innerCoder = fromProto(components.getCodersOrThrow(componentId), components);
+      Coder<?> innerCoder = components.getCoder(componentId);
       coderComponents.add(innerCoder);
     }
     Class<? extends StructuredCoder> coderType = KNOWN_CODER_URNS.inverse().get(coderUrn);
@@ -176,9 +176,7 @@ public class CoderTranslation {
     return translator.fromComponents(coderComponents);
   }
 
-  private static Coder<?> fromCustomCoder(
-      RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components)
-      throws IOException {
+  private static Coder<?> fromCustomCoder(RunnerApi.Coder protoCoder) throws IOException {
     return (Coder<?>)
         SerializableUtils.deserializeFromByteArray(
             protoCoder

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
index 2e5b02c..d909ccf 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CombineTranslation.java
@@ -149,9 +149,9 @@ public class CombineTranslation {
   }
 
   public static Coder<?> getAccumulatorCoder(
-      CombinePayload payload, RunnerApi.Components components) throws IOException {
+      CombinePayload payload, RehydratedComponents components) throws IOException {
     String id = payload.getAccumulatorCoderId();
-    return CoderTranslation.fromProto(components.getCodersOrThrow(id), components);
+    return components.getCoder(id);
   }
 
   public static Coder<?> getAccumulatorCoder(
@@ -159,7 +159,8 @@ public class CombineTranslation {
     SdkComponents sdkComponents = SdkComponents.create();
     String id = getCombinePayload(transform, sdkComponents).getAccumulatorCoderId();
     Components components = sdkComponents.toComponents();
-    return CoderTranslation.fromProto(components.getCodersOrThrow(id), components);
+    return CoderTranslation.fromProto(
+        components.getCodersOrThrow(id), RehydratedComponents.forComponents(components));
   }
 
   public static GlobalCombineFn<?, ?, ?> getCombineFn(CombinePayload payload)

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 52526bb..c0a5acf 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -18,14 +18,12 @@
 
 package org.apache.beam.runners.core.construction;
 
-import com.google.protobuf.InvalidProtocolBufferException;
 import java.io.IOException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.WindowingStrategy;
 
 /**
  * Utility methods for translating {@link PCollection PCollections} to and from Runner API protos.
@@ -49,37 +47,21 @@ public class PCollectionTranslation {
   }
 
   public static PCollection<?> fromProto(
-      Pipeline pipeline, RunnerApi.PCollection pCollection, RunnerApi.Components components)
+      RunnerApi.PCollection pCollection, Pipeline pipeline, RehydratedComponents components)
       throws IOException {
+
+    Coder<?> coder = components.getCoder(pCollection.getCoderId());
     return PCollection.createPrimitiveOutputInternal(
             pipeline,
-            WindowingStrategyTranslation.fromProto(
-                components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()),
-                components),
+            components.getWindowingStrategy(pCollection.getWindowingStrategyId()),
             fromProto(pCollection.getIsBounded()))
-        .setCoder(
-            (Coder)
-                CoderTranslation.fromProto(
-                    components.getCodersOrThrow(pCollection.getCoderId()), components));
+        .setCoder((Coder) coder);
   }
 
   public static IsBounded isBounded(RunnerApi.PCollection pCollection) {
     return fromProto(pCollection.getIsBounded());
   }
 
-  public static Coder<?> getCoder(
-      RunnerApi.PCollection pCollection, RunnerApi.Components components) throws IOException {
-    return CoderTranslation
-        .fromProto(components.getCodersOrThrow(pCollection.getCoderId()), components);
-  }
-
-  public static WindowingStrategy<?, ?> getWindowingStrategy(
-      RunnerApi.PCollection pCollection, RunnerApi.Components components)
-      throws InvalidProtocolBufferException {
-    return WindowingStrategyTranslation.fromProto(
-        components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), components);
-  }
-
   static RunnerApi.IsBounded toProto(IsBounded bounded) {
     switch (bounded) {
       case BOUNDED:

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index 03f29ff..d7b0e9f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -262,6 +262,8 @@ public class ParDoTranslation {
     ParDoPayload payload = parDoProto.getSpec().getParameter().unpack(ParDoPayload.class);
 
     List<PCollectionView<?>> views = new ArrayList<>();
+    RehydratedComponents components =
+        RehydratedComponents.forComponents(sdkComponents.toComponents());
     for (Map.Entry<String, SideInput> sideInputEntry : payload.getSideInputsMap().entrySet()) {
       String sideInputTag = sideInputEntry.getKey();
       RunnerApi.SideInput sideInput = sideInputEntry.getValue();
@@ -276,7 +278,7 @@ public class ParDoTranslation {
               sideInputTag,
               originalPCollection,
               parDoProto,
-              sdkComponents.toComponents()));
+              components));
     }
     return views;
   }
@@ -353,18 +355,13 @@ public class ParDoTranslation {
   }
 
   @VisibleForTesting
-  static StateSpec<?> fromProto(RunnerApi.StateSpec stateSpec, RunnerApi.Components components)
+  static StateSpec<?> fromProto(RunnerApi.StateSpec stateSpec, RehydratedComponents components)
       throws IOException {
     switch (stateSpec.getSpecCase()) {
       case VALUE_SPEC:
-        return StateSpecs.value(
-            CoderTranslation.fromProto(
-                components.getCodersMap().get(stateSpec.getValueSpec().getCoderId()), components));
+        return StateSpecs.value(components.getCoder(stateSpec.getValueSpec().getCoderId()));
       case BAG_SPEC:
-        return StateSpecs.bag(
-            CoderTranslation.fromProto(
-                components.getCodersMap().get(stateSpec.getBagSpec().getElementCoderId()),
-                components));
+        return StateSpecs.bag(components.getCoder(stateSpec.getBagSpec().getElementCoderId()));
       case COMBINING_SPEC:
         FunctionSpec combineFnSpec = stateSpec.getCombiningSpec().getCombineFn().getSpec();
 
@@ -386,26 +383,16 @@ public class ParDoTranslation {
         // Rawtype coder cast because it is required to be a valid accumulator coder
         // for the CombineFn, by construction
         return StateSpecs.combining(
-            (Coder)
-                CoderTranslation.fromProto(
-                    components
-                        .getCodersMap()
-                        .get(stateSpec.getCombiningSpec().getAccumulatorCoderId()),
-                    components),
+            (Coder) components.getCoder(stateSpec.getCombiningSpec().getAccumulatorCoderId()),
             combineFn);
 
       case MAP_SPEC:
         return StateSpecs.map(
-            CoderTranslation.fromProto(
-                components.getCodersOrThrow(stateSpec.getMapSpec().getKeyCoderId()), components),
-            CoderTranslation.fromProto(
-                components.getCodersOrThrow(stateSpec.getMapSpec().getValueCoderId()), components));
+            components.getCoder(stateSpec.getMapSpec().getKeyCoderId()),
+            components.getCoder(stateSpec.getMapSpec().getValueCoderId()));
 
       case SET_SPEC:
-        return StateSpecs.set(
-            CoderTranslation.fromProto(
-                components.getCodersMap().get(stateSpec.getSetSpec().getElementCoderId()),
-                components));
+        return StateSpecs.set(components.getCoder(stateSpec.getSetSpec().getElementCoderId()));
 
       case SPEC_NOT_SET:
       default:
@@ -517,7 +504,7 @@ public class ParDoTranslation {
       String localName,
       PCollection<?> pCollection,
       RunnerApi.PTransform parDoTransform,
-      Components components)
+      RehydratedComponents components)
       throws IOException {
     checkArgument(
         localName != null,
@@ -527,20 +514,13 @@ public class ParDoTranslation {
     WindowMappingFn<?> windowMappingFn = windowMappingFnFromProto(sideInput.getWindowMappingFn());
     ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn());
 
-    RunnerApi.PCollection inputCollection =
-        components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(localName));
-    WindowingStrategy<?, ?> windowingStrategy =
-        WindowingStrategyTranslation.fromProto(
-            components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
-            components);
-    Coder<?> elemCoder =
-        CoderTranslation
-            .fromProto(components.getCodersOrThrow(inputCollection.getCoderId()), components);
+    WindowingStrategy<?, ?> windowingStrategy = pCollection.getWindowingStrategy().fixDefaults();
     Coder<Iterable<WindowedValue<?>>> coder =
         (Coder)
             IterableCoder.of(
                 FullWindowedValueCoder.of(
-                    elemCoder, windowingStrategy.getWindowFn().windowCoder()));
+                    pCollection.getCoder(),
+                    pCollection.getWindowingStrategy().getWindowFn().windowCoder()));
     checkArgument(
         sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN),
         "Unknown View Materialization URN %s",

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
new file mode 100644
index 0000000..a9a34d7
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
+
+/**
+ * Vends Java SDK objects rehydrated from a Runner API {@link Components} collection.
+ *
+ * <p>This ensures maximum memoization of rehydrated components, which is semantically necessary for
+ * {@link PCollection} and nice-to-have for other objects.
+ */
+public class RehydratedComponents {
+  private final Components components;
+
+  /**
+   * This class may be used in the context of a pipeline or not. If not, then it cannot
+   * rehydrated {@link PCollection PCollections}.
+   */
+  @Nullable
+  private final Pipeline pipeline;
+
+  /**
+   * A non-evicting cache, serving as a memo table for rehydrated {@link WindowingStrategy
+   * WindowingStrategies}.
+   */
+  private final LoadingCache<String, WindowingStrategy<?, ?>> windowingStrategies =
+      CacheBuilder.newBuilder()
+          .build(
+              new CacheLoader<String, WindowingStrategy<?, ?>>() {
+                @Override
+                public WindowingStrategy<?, ?> load(String id) throws Exception {
+                  return WindowingStrategyTranslation.fromProto(
+                      components.getWindowingStrategiesOrThrow(id), RehydratedComponents.this);
+                }
+              });
+
+  /** A non-evicting cache, serving as a memo table for rehydrated {@link Coder Coders}. */
+  private final LoadingCache<String, Coder<?>> coders =
+      CacheBuilder.newBuilder()
+          .build(
+              new CacheLoader<String, Coder<?>>() {
+                @Override
+                public Coder<?> load(String id) throws Exception {
+                  return CoderTranslation.fromProto(
+                      components.getCodersOrThrow(id), RehydratedComponents.this);
+                }
+              });
+
+  /**
+   * A non-evicting cache, serving as a memo table for rehydrated {@link PCollection PCollections}.
+   */
+  private final LoadingCache<String, PCollection<?>> pCollections =
+      CacheBuilder.newBuilder()
+          .build(
+              new CacheLoader<String, PCollection<?>>() {
+                @Override
+                public PCollection<?> load(String id) throws Exception {
+                  checkState(
+                      pipeline != null,
+                      "%s Cannot rehydrate %s without a %s:"
+                          + " provide one via .withPipeline(...)",
+                      RehydratedComponents.class.getSimpleName(),
+                      PCollection.class.getSimpleName(),
+                      Pipeline.class.getSimpleName());
+                  return PCollectionTranslation.fromProto(
+                      components.getPcollectionsOrThrow(id), pipeline, RehydratedComponents.this);
+                }
+              });
+
+
+  /** Create a new {@link RehydratedComponents} from a Runner API {@link Components}. */
+  public static RehydratedComponents forComponents(RunnerApi.Components components) {
+    return new RehydratedComponents(components, null);
+  }
+
+  /** Create a new {@link RehydratedComponents} with a pipeline attached. */
+  public RehydratedComponents withPipeline(Pipeline pipeline) {
+    return new RehydratedComponents(components, pipeline);
+  }
+
+  private RehydratedComponents(RunnerApi.Components components, @Nullable Pipeline pipeline) {
+    this.components = components;
+    this.pipeline = pipeline;
+  }
+
+  /**
+   * Returns a {@link PCollection} rehydrated from the Runner API component with the given ID.
+   *
+   * <p>For a single instance of {@link RehydratedComponents}, this always returns the same instance
+   * for a particular id.
+   */
+  public PCollection<?> getPCollection(String pCollectionId) throws IOException {
+    try {
+      return pCollections.get(pCollectionId);
+    } catch (ExecutionException exc) {
+      throw new RuntimeException(exc);
+    }
+  }
+
+  /**
+   * Returns a {@link WindowingStrategy} rehydrated from the Runner API component with the given ID.
+   *
+   * <p>For a single instance of {@link RehydratedComponents}, this always returns the same instance
+   * for a particular id.
+   */
+  public WindowingStrategy<?, ?> getWindowingStrategy(String windowingStrategyId)
+      throws IOException {
+    try {
+      return windowingStrategies.get(windowingStrategyId);
+    } catch (ExecutionException exc) {
+      throw new RuntimeException(exc);
+    }
+  }
+
+  /**
+   * Returns a {@link Coder} rehydrated from the Runner API component with the given ID.
+   *
+   * <p>For a single instance of {@link RehydratedComponents}, this always returns the same instance
+   * for a particular id.
+   */
+  public Coder<?> getCoder(String coderId) throws IOException {
+    try {
+      return coders.get(coderId);
+    } catch (ExecutionException exc) {
+      throw new RuntimeException(exc);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
index 515de57..f23b2ec 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TestStreamTranslation.java
@@ -63,13 +63,10 @@ public class TestStreamTranslation {
   }
 
   private static TestStream<?> fromProto(
-      RunnerApi.TestStreamPayload testStreamPayload, RunnerApi.Components components)
+      RunnerApi.TestStreamPayload testStreamPayload, RehydratedComponents components)
       throws IOException {
 
-    Coder<Object> coder =
-        (Coder<Object>)
-            CoderTranslation.fromProto(
-                components.getCodersOrThrow(testStreamPayload.getCoderId()), components);
+    Coder<Object> coder = (Coder<Object>) components.getCoder(testStreamPayload.getCoderId());
 
     List<TestStream.Event<Object>> events = new ArrayList<>();
 
@@ -101,7 +98,9 @@ public class TestStreamTranslation {
     RunnerApi.TestStreamPayload testStreamPayload =
         transformProto.getSpec().getParameter().unpack(RunnerApi.TestStreamPayload.class);
 
-    return (TestStream<T>) fromProto(testStreamPayload, sdkComponents.toComponents());
+    return (TestStream<T>)
+        fromProto(
+            testStreamPayload, RehydratedComponents.forComponents(sdkComponents.toComponents()));
   }
 
   static <T> RunnerApi.TestStreamPayload.Event toProto(TestStream.Event<T> event, Coder<T> coder)

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index 1456a3f..d5bdea4 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -322,7 +322,9 @@ public class WindowingStrategyTranslation implements Serializable {
       throws InvalidProtocolBufferException {
     switch (proto.getRootCase()) {
       case WINDOWING_STRATEGY:
-        return fromProto(proto.getWindowingStrategy(), proto.getComponents());
+        return fromProto(
+            proto.getWindowingStrategy(),
+            RehydratedComponents.forComponents(proto.getComponents()));
       default:
         throw new IllegalArgumentException(
             String.format(
@@ -336,7 +338,7 @@ public class WindowingStrategyTranslation implements Serializable {
    * the provided components to dereferences identifiers found in the proto.
    */
   public static WindowingStrategy<?, ?> fromProto(
-      RunnerApi.WindowingStrategy proto, Components components)
+      RunnerApi.WindowingStrategy proto, RehydratedComponents components)
       throws InvalidProtocolBufferException {
 
     SdkFunctionSpec windowFnSpec = proto.getWindowFn();

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
index 39549d0..95766f5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -136,7 +136,9 @@ public class CoderTranslationTest {
       RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, componentsBuilder);
 
       Components encodedComponents = componentsBuilder.toComponents();
-      Coder<?> decodedCoder = CoderTranslation.fromProto(coderProto, encodedComponents);
+      Coder<?> decodedCoder =
+          CoderTranslation.fromProto(
+              coderProto, RehydratedComponents.forComponents(encodedComponents));
       assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder));
 
       if (KNOWN_CODERS.contains(coder)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
index b3b42ab..0373fba 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CombineTranslationTest.java
@@ -103,8 +103,10 @@ public class CombineTranslationTest {
       CombinePayload combineProto = CombineTranslation.toProto(combine.get(), sdkComponents);
       RunnerApi.Components componentsProto = sdkComponents.toComponents();
 
-      assertEquals(combineFn.getAccumulatorCoder(pipeline.getCoderRegistry(), input.getCoder()),
-          CombineTranslation.getAccumulatorCoder(combineProto, componentsProto));
+      assertEquals(
+          combineFn.getAccumulatorCoder(pipeline.getCoderRegistry(), input.getCoder()),
+          CombineTranslation.getAccumulatorCoder(
+              combineProto, RehydratedComponents.forComponents(componentsProto)));
       assertEquals(combineFn, CombineTranslation.getCombineFn(combineProto));
     }
   }
@@ -146,7 +148,8 @@ public class CombineTranslationTest {
 
       assertEquals(
           combineFn.getAccumulatorCoder(pipeline.getCoderRegistry(), input.getCoder()),
-          CombineTranslation.getAccumulatorCoder(combineProto, componentsProto));
+          CombineTranslation.getAccumulatorCoder(
+              combineProto, RehydratedComponents.forComponents(componentsProto)));
       assertEquals(combineFn, CombineTranslation.getCombineFn(combineProto));
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
index 5c45487..df02a39 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionTranslationTest.java
@@ -117,12 +117,13 @@ public class PCollectionTranslationTest {
     SdkComponents sdkComponents = SdkComponents.create();
     RunnerApi.PCollection protoCollection =
         PCollectionTranslation.toProto(testCollection, sdkComponents);
-    RunnerApi.Components protoComponents = sdkComponents.toComponents();
+    RehydratedComponents protoComponents =
+        RehydratedComponents.forComponents(sdkComponents.toComponents());
 
     // Decode
     Pipeline pipeline = Pipeline.create();
     PCollection<?> decodedCollection =
-        PCollectionTranslation.fromProto(pipeline, protoCollection, protoComponents);
+        PCollectionTranslation.fromProto(protoCollection, pipeline, protoComponents);
 
     // Verify
     assertThat(decodedCollection.getCoder(), Matchers.<Coder<?>>equalTo(testCollection.getCoder()));
@@ -138,10 +139,11 @@ public class PCollectionTranslationTest {
     SdkComponents sdkComponents = SdkComponents.create();
     RunnerApi.PCollection protoCollection = PCollectionTranslation
         .toProto(testCollection, sdkComponents);
-    RunnerApi.Components protoComponents = sdkComponents.toComponents();
-    Coder<?> decodedCoder = PCollectionTranslation.getCoder(protoCollection, protoComponents);
+    RehydratedComponents protoComponents =
+        RehydratedComponents.forComponents(sdkComponents.toComponents());
+    Coder<?> decodedCoder = protoComponents.getCoder(protoCollection.getCoderId());
     WindowingStrategy<?, ?> decodedStrategy =
-        PCollectionTranslation.getWindowingStrategy(protoCollection, protoComponents);
+        protoComponents.getWindowingStrategy(protoCollection.getWindowingStrategyId());
     IsBounded decodedIsBounded = PCollectionTranslation.isBounded(protoCollection);
 
     assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(testCollection.getCoder()));

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
index a87a16d..c31e803 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.sdk.common.runner.v1.RunnerApi.SideInput;
 import org.apache.beam.sdk.io.GenerateSequence;
@@ -64,7 +63,6 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
@@ -151,7 +149,9 @@ public class ParDoTranslationTest {
               AppliedPTransform.<PCollection<KV<Long, String>>, PCollection<Void>, MultiOutput>of(
                   "foo", inputs, output.expand(), parDo, p),
               sdkComponents);
-      Components protoComponents = sdkComponents.toComponents();
+      RunnerApi.Components components = sdkComponents.toComponents();
+      RehydratedComponents rehydratedComponents =
+          RehydratedComponents.forComponents(components);
 
       // Decode
       Pipeline rehydratedPipeline = Pipeline.create();
@@ -166,7 +166,7 @@ public class ParDoTranslationTest {
                 view.getTagInternal().getId(),
                 view.getPCollection(),
                 protoTransform,
-                protoComponents);
+                rehydratedComponents);
         assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal()));
         assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass()));
         assertThat(
@@ -179,54 +179,41 @@ public class ParDoTranslationTest {
       }
       String mainInputId = sdkComponents.registerPCollection(mainInput);
       assertThat(
-          ParDoTranslation.getMainInput(protoTransform, protoComponents),
-          equalTo(protoComponents.getPcollectionsOrThrow(mainInputId)));
+          ParDoTranslation.getMainInput(protoTransform, components),
+          equalTo(components.getPcollectionsOrThrow(mainInputId)));
     }
   }
 
   /**
    * Tests for translating state and timer bits to/from protos.
    */
-  @RunWith(JUnit4.class)
+  @RunWith(Parameterized.class)
   public static class TestStateAndTimerTranslation {
 
-    @Test
-    public void testValueStateSpecToFromProto() throws Exception {
-      SdkComponents sdkComponents = SdkComponents.create();
-      StateSpec<?> stateSpec = StateSpecs.value(VarIntCoder.of());
-      StateSpec<?> deserializedStateSpec =
-          ParDoTranslation.fromProto(
-              ParDoTranslation.toProto(stateSpec, sdkComponents), sdkComponents.toComponents());
-      assertThat(stateSpec, Matchers.<StateSpec<?>>equalTo(deserializedStateSpec));
+    @Parameters(name = "{index}: {0}")
+    public static Iterable<StateSpec<?>> stateSpecs() {
+      return ImmutableList.of(
+          StateSpecs.value(VarIntCoder.of()),
+          StateSpecs.bag(VarIntCoder.of()),
+          StateSpecs.set(VarIntCoder.of()),
+          StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of()));
     }
 
-    @Test
-    public void testBagStateSpecToFromProto() throws Exception {
-      SdkComponents sdkComponents = SdkComponents.create();
-      StateSpec<?> stateSpec = StateSpecs.bag(VarIntCoder.of());
-      StateSpec<?> deserializedStateSpec =
-          ParDoTranslation.fromProto(
-              ParDoTranslation.toProto(stateSpec, sdkComponents), sdkComponents.toComponents());
-      assertThat(stateSpec, Matchers.<StateSpec<?>>equalTo(deserializedStateSpec));
-    }
+    @Parameter
+    public StateSpec<?> stateSpec;
 
     @Test
-    public void testSetStateSpecToFromProto() throws Exception {
+    public void testStateSpecToFromProto() throws Exception {
+      // Encode
       SdkComponents sdkComponents = SdkComponents.create();
-      StateSpec<?> stateSpec = StateSpecs.set(VarIntCoder.of());
-      StateSpec<?> deserializedStateSpec =
-          ParDoTranslation.fromProto(
-              ParDoTranslation.toProto(stateSpec, sdkComponents), sdkComponents.toComponents());
-      assertThat(stateSpec, Matchers.<StateSpec<?>>equalTo(deserializedStateSpec));
-    }
+      RunnerApi.StateSpec stateSpecProto = ParDoTranslation.toProto(stateSpec, sdkComponents);
 
-    @Test
-    public void testMapStateSpecToFromProto() throws Exception {
-      SdkComponents sdkComponents = SdkComponents.create();
-      StateSpec<?> stateSpec = StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of());
+      // Decode
+      RehydratedComponents rehydratedComponents =
+          RehydratedComponents.forComponents(sdkComponents.toComponents());
       StateSpec<?> deserializedStateSpec =
-          ParDoTranslation.fromProto(
-              ParDoTranslation.toProto(stateSpec, sdkComponents), sdkComponents.toComponents());
+          ParDoTranslation.fromProto(stateSpecProto, rehydratedComponents);
+
       assertThat(stateSpec, Matchers.<StateSpec<?>>equalTo(deserializedStateSpec));
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/RehydratedComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/RehydratedComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/RehydratedComponentsTest.java
new file mode 100644
index 0000000..0da487b
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/RehydratedComponentsTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link RehydratedComponents}.
+ *
+ * <p>These are basic sanity checks. The most thorough testing of this is by extensive use in all
+ * other rehydration. The two are tightly coupled, as they recursively invoke each other.
+ */
+@RunWith(JUnit4.class)
+public class RehydratedComponentsTest {
+
+  @Test
+  public void testSimpleCoder() throws Exception {
+    SdkComponents sdkComponents = SdkComponents.create();
+    Coder<?> coder = VarIntCoder.of();
+    String id = sdkComponents.registerCoder(coder);
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(sdkComponents.toComponents());
+
+    Coder<?> rehydratedCoder = rehydratedComponents.getCoder(id);
+    assertThat(rehydratedCoder, equalTo((Coder) coder));
+    assertThat(rehydratedComponents.getCoder(id), theInstance((Coder) rehydratedCoder));
+  }
+
+  @Test
+  public void testCompoundCoder() throws Exception {
+    SdkComponents sdkComponents = SdkComponents.create();
+    Coder<?> coder = VarIntCoder.of();
+    Coder<?> compoundCoder = NullableCoder.of(coder);
+    String compoundCoderId = sdkComponents.registerCoder(compoundCoder);
+    String coderId = sdkComponents.registerCoder(coder);
+
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(sdkComponents.toComponents());
+
+    Coder<?> rehydratedCoder = rehydratedComponents.getCoder(coderId);
+    Coder<?> rehydratedCompoundCoder = rehydratedComponents.getCoder(compoundCoderId);
+
+    assertThat(rehydratedCoder, equalTo((Coder) coder));
+    assertThat(rehydratedCompoundCoder, equalTo((Coder) compoundCoder));
+
+    assertThat(rehydratedComponents.getCoder(coderId), theInstance((Coder) rehydratedCoder));
+    assertThat(
+        rehydratedComponents.getCoder(compoundCoderId),
+        theInstance((Coder) rehydratedCompoundCoder));
+  }
+
+  @Test
+  public void testWindowingStrategy() throws Exception {
+    SdkComponents sdkComponents = SdkComponents.create();
+    WindowingStrategy windowingStrategy =
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(1)))
+            .withAllowedLateness(Duration.standardSeconds(4));
+    String id = sdkComponents.registerWindowingStrategy(windowingStrategy);
+    RehydratedComponents rehydratedComponents =
+        RehydratedComponents.forComponents(sdkComponents.toComponents());
+
+    WindowingStrategy<?, ?> rehydratedStrategy = rehydratedComponents.getWindowingStrategy(id);
+    assertThat(rehydratedStrategy, equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
+    assertThat(
+        rehydratedComponents.getWindowingStrategy(id),
+        theInstance((WindowingStrategy) rehydratedStrategy));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
index b2029be..e4336df 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TestStreamTranslationTest.java
@@ -82,7 +82,8 @@ public class TestStreamTranslationTest {
       RunnerApi.TestStreamPayload payload =
           TestStreamTranslation.testStreamToPayload(testStream, components);
 
-      verifyTestStreamEncoding(testStream, payload, components.toComponents());
+      verifyTestStreamEncoding(
+          testStream, payload, RehydratedComponents.forComponents(components.toComponents()));
     }
 
     @Test
@@ -102,19 +103,19 @@ public class TestStreamTranslationTest {
       RunnerApi.TestStreamPayload payload =
           spec.getParameter().unpack(RunnerApi.TestStreamPayload.class);
 
-      verifyTestStreamEncoding(testStream, payload, components.toComponents());
+      verifyTestStreamEncoding(
+          testStream, payload, RehydratedComponents.forComponents(components.toComponents()));
     }
 
     private static <T> void verifyTestStreamEncoding(
         TestStream<T> testStream,
         RunnerApi.TestStreamPayload payload,
-        RunnerApi.Components protoComponents)
+        RehydratedComponents protoComponents)
         throws Exception {
 
       // This reverse direction is only valid for Java-based coders
       assertThat(
-          CoderTranslation.fromProto(
-              protoComponents.getCodersOrThrow(payload.getCoderId()), protoComponents),
+          protoComponents.getCoder(payload.getCoderId()),
           Matchers.<Coder<?>>equalTo(testStream.getValueCoder()));
 
       assertThat(payload.getEventsList().size(), equalTo(testStream.getEvents().size()));

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
index 7a57fd7..e57a088 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -108,13 +108,14 @@ public class WindowingStrategyTranslationTest {
     SdkComponents components = SdkComponents.create();
     RunnerApi.WindowingStrategy proto =
         WindowingStrategyTranslation.toProto(windowingStrategy, components);
-    RunnerApi.Components protoComponents = components.toComponents();
+    RehydratedComponents protoComponents =
+        RehydratedComponents.forComponents(components.toComponents());
 
     assertThat(
         WindowingStrategyTranslation.fromProto(proto, protoComponents).fixDefaults(),
         Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults()));
 
-    protoComponents.getCodersOrThrow(
+    protoComponents.getCoder(
         components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
     assertThat(
         proto.getAssignsToOneWindow(),

http://git-wip-us.apache.org/repos/asf/beam/blob/01492e69/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7335ef7..762ac9f 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -63,6 +63,7 @@ import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
 import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
+import org.apache.beam.runners.core.construction.RehydratedComponents;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
@@ -1089,6 +1090,19 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       private final Collection<byte[]> elements;
       private final RunnerApi.MessageWithComponents coderSpec;
 
+      // lazily initialized by parsing coderSpec
+      private transient Coder<T> coder;
+      private Coder<T> getCoder() throws IOException {
+        if (coder == null) {
+          coder =
+              (Coder)
+                  CoderTranslation.fromProto(
+                      coderSpec.getCoder(),
+                      RehydratedComponents.forComponents(coderSpec.getComponents()));
+        }
+        return coder;
+      }
+
       private DecodeAndEmitDoFn(Collection<byte[]> elements, Coder<T> coder) throws IOException {
         this.elements = elements;
         this.coderSpec = CoderTranslation.toProto(coder);
@@ -1096,8 +1110,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
       @ProcessElement
       public void processElement(ProcessContext context) throws IOException {
-        Coder<T> coder =
-            (Coder) CoderTranslation.fromProto(coderSpec.getCoder(), coderSpec.getComponents());
         for (byte[] element : elements) {
           context.output(CoderUtils.decodeFromByteArray(coder, element));
         }


Mime
View raw message