beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4271) Executable stages allow side input coders to be set and/or queried
Date Mon, 21 May 2018 16:21:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4271?focusedWorklogId=104107&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104107 ]

ASF GitHub Bot logged work on BEAM-4271:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/May/18 16:20
            Start Date: 21/May/18 16:20
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #5374: [BEAM-4271] Support side inputs for ExecutableStage and provide runner side utilities for handling multimap side inputs.
URL: https://github.com/apache/beam/pull/5374
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
index 06450f9efec..b2f06982730 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java
@@ -75,6 +75,7 @@ public static FnApiControlClient forRequestObserver(
     return new FnApiControlClient(workerId, requestObserver);
   }
 
+  @Override
   public CompletionStage<BeamFnApi.InstructionResponse> handle(
       BeamFnApi.InstructionRequest request) {
     LOG.debug("Sending InstructionRequest {}", request);
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
index d9fa6e1d334..9205d12681e 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
@@ -18,15 +18,18 @@
 
 package org.apache.beam.runners.fnexecution.control;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.runners.core.construction.SyntheticComponents.uniqueId;
 
 import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableTable;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor.Builder;
@@ -37,21 +40,64 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.MessageWithComponents;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.runners.core.construction.SyntheticComponents;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.apache.beam.runners.core.construction.graph.SideInputReference;
 import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
 import org.apache.beam.runners.fnexecution.wire.WireCoders;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.sdk.values.KV;
 
-/** Utility methods for creating {@link ProcessBundleDescriptor} instances. */
+/**
+ * Utility methods for creating {@link ProcessBundleDescriptor} instances.
+ */
 // TODO: Rename to ExecutableStages?
 public class ProcessBundleDescriptors {
+
+  /**
+   * Note that the {@link ProcessBundleDescriptor} is constructed by:
+   * <ul>
+   *   <li>Adding gRPC read and write nodes wiring them to the specified data endpoint.</li>
+   *   <li>Setting the state {@link ApiServiceDescriptor} to the specified state endpoint.</li>
+   *   <li>Modifying the coder on PCollections that are accessed as side inputs to be length
+   *   prefixed making them binary compatible with the coder chosen when that side input is
+   *   materialized.</li>
+   * </ul>
+   */
+  public static ExecutableProcessBundleDescriptor fromExecutableStage(
+      String id,
+      ExecutableStage stage,
+      ApiServiceDescriptor dataEndpoint,
+      ApiServiceDescriptor stateEndpoint) throws IOException {
+    checkState(id != null, "id must be specified.");
+    checkState(stage != null, "stage must be specified.");
+    checkState(dataEndpoint != null, "dataEndpoint must be specified.");
+    checkState(stateEndpoint != null, "stateEndpoint must be specified.");
+    return fromExecutableStageInternal(id, stage, dataEndpoint, stateEndpoint);
+  }
+
   public static ExecutableProcessBundleDescriptor fromExecutableStage(
       String id, ExecutableStage stage, ApiServiceDescriptor dataEndpoint) throws IOException {
+    checkState(id != null, "id must be specified.");
+    checkState(stage != null, "stage must be specified.");
+    checkState(dataEndpoint != null, "dateEndpoint must be specified.");
+    return fromExecutableStageInternal(id, stage, dataEndpoint, null);
+  }
+
+  private static ExecutableProcessBundleDescriptor fromExecutableStageInternal(
+      String id,
+      ExecutableStage stage,
+      ApiServiceDescriptor dataEndpoint,
+      @Nullable ApiServiceDescriptor stateEndpoint) throws IOException {
     Components components = stage.getComponents();
     // Create with all of the processing transforms, and all of the components.
     // TODO: Remove the unreachable subcomponents if the size of the descriptor matters.
@@ -68,6 +114,9 @@ public static ExecutableProcessBundleDescriptor fromExecutableStage(
                     .stream()
                     .collect(
                         Collectors.toMap(PTransformNode::getId, PTransformNode::getTransform)));
+    if (stateEndpoint != null) {
+      bundleDescriptorBuilder.setStateApiServiceDescriptor(stateEndpoint);
+    }
 
     RemoteInputDestination<WindowedValue<?>> inputDestination =
         addStageInput(
@@ -80,8 +129,14 @@ public static ExecutableProcessBundleDescriptor fromExecutableStage(
       outputTargetCoders.put(targetEncoding.getTarget(), targetEncoding.getCoder());
     }
 
+    Map<String, Map<String, MultimapSideInputSpec>> multimapSideInputSpecs =
+        forMultimapSideInputs(stage, components, bundleDescriptorBuilder);
+
     return ExecutableProcessBundleDescriptor.of(
-        bundleDescriptorBuilder.build(), inputDestination, outputTargetCoders);
+        bundleDescriptorBuilder.build(),
+        inputDestination,
+        outputTargetCoders,
+        multimapSideInputSpecs);
   }
 
   private static RemoteInputDestination<WindowedValue<?>> addStageInput(
@@ -146,6 +201,57 @@ private static TargetEncoding addStageOutput(
         wireCoder);
   }
 
+  private static Map<String, Map<String, MultimapSideInputSpec>> forMultimapSideInputs(
+      ExecutableStage stage,
+      Components components,
+      ProcessBundleDescriptor.Builder bundleDescriptorBuilder) throws IOException {
+    ImmutableTable.Builder<String, String, MultimapSideInputSpec> idsToSpec =
+        ImmutableTable.builder();
+    for (SideInputReference sideInputReference : stage.getSideInputs()) {
+      // Update the coder specification for side inputs to be length prefixed so that the
+      // SDK and Runner agree on how to encode/decode the key, window, and values for multimap
+      // side inputs.
+      String pCollectionId = sideInputReference.collection().getId();
+      RunnerApi.MessageWithComponents lengthPrefixedSideInputCoder =
+          LengthPrefixUnknownCoders.forCoder(
+              components.getPcollectionsOrThrow(pCollectionId).getCoderId(),
+              components,
+              false);
+      String lengthPrefixedSideInputCoderId = SyntheticComponents.uniqueId(
+          String.format(
+              "fn/side_input/%s",
+              components.getPcollectionsOrThrow(pCollectionId).getCoderId()),
+          bundleDescriptorBuilder.getCodersMap().keySet()::contains);
+
+      bundleDescriptorBuilder.putCoders(
+          lengthPrefixedSideInputCoderId, lengthPrefixedSideInputCoder.getCoder());
+      bundleDescriptorBuilder.putAllCoders(
+          lengthPrefixedSideInputCoder.getComponents().getCodersMap());
+      bundleDescriptorBuilder.putPcollections(
+          pCollectionId,
+          bundleDescriptorBuilder
+              .getPcollectionsMap()
+              .get(pCollectionId)
+              .toBuilder()
+              .setCoderId(lengthPrefixedSideInputCoderId)
+              .build());
+
+      FullWindowedValueCoder<KV<?, ?>> coder =
+          (FullWindowedValueCoder) WireCoders.instantiateRunnerWireCoder(
+              sideInputReference.collection(), components);
+      idsToSpec.put(
+          sideInputReference.transform().getId(),
+          sideInputReference.localName(),
+          MultimapSideInputSpec.of(
+              sideInputReference.transform().getId(),
+              sideInputReference.localName(),
+              ((KvCoder) coder.getValueCoder()).getKeyCoder(),
+              ((KvCoder) coder.getValueCoder()).getValueCoder(),
+              coder.getWindowCoder()));
+    }
+    return idsToSpec.build().rowMap();
+  }
+
   @AutoValue
   abstract static class TargetEncoding {
     abstract BeamFnApi.Target getTarget();
@@ -153,6 +259,29 @@ private static TargetEncoding addStageOutput(
     abstract Coder<WindowedValue<?>> getCoder();
   }
 
+  /**
+   * A container type storing references to the key, value, and window {@link Coder} used when
+   * handling multimap side input state requests.
+   */
+  @AutoValue
+  public abstract static class MultimapSideInputSpec<K, V, W extends BoundedWindow> {
+    static <K, V, W extends BoundedWindow> MultimapSideInputSpec<K, V, W> of(
+        String transformId,
+        String sideInputId,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        Coder<W> windowCoder) {
+      return new AutoValue_ProcessBundleDescriptors_MultimapSideInputSpec(
+          transformId, sideInputId, keyCoder, valueCoder, windowCoder);
+    }
+
+    public abstract String transformId();
+    public abstract String sideInputId();
+    public abstract Coder<K> keyCoder();
+    public abstract Coder<V> valueCoder();
+    public abstract Coder<W> windowCoder();
+  }
+
   /**
    * Add a {@link RunnerApi.Coder} suitable for using as the wire coder to the provided {@link
    * ProcessBundleDescriptor.Builder} and return the ID of that Coder.
@@ -179,9 +308,20 @@ private static String addWireCoder(
     public static ExecutableProcessBundleDescriptor of(
         ProcessBundleDescriptor descriptor,
         RemoteInputDestination<WindowedValue<?>> inputDestination,
-        Map<BeamFnApi.Target, Coder<WindowedValue<?>>> outputTargetCoders) {
+        Map<BeamFnApi.Target, Coder<WindowedValue<?>>> outputTargetCoders,
+        Map<String, Map<String, MultimapSideInputSpec>> multimapSideInputSpecs) {
+      ImmutableTable.Builder copyOfMultimapSideInputSpecs = ImmutableTable.builder();
+      for (Map.Entry<String, Map<String, MultimapSideInputSpec>> outer
+          : multimapSideInputSpecs.entrySet()) {
+        for (Map.Entry<String, MultimapSideInputSpec> inner : outer.getValue().entrySet()) {
+          copyOfMultimapSideInputSpecs.put(outer.getKey(), inner.getKey(), inner.getValue());
+        }
+      }
       return new AutoValue_ProcessBundleDescriptors_ExecutableProcessBundleDescriptor(
-          descriptor, inputDestination, Collections.unmodifiableMap(outputTargetCoders));
+          descriptor,
+          inputDestination,
+          Collections.unmodifiableMap(outputTargetCoders),
+          copyOfMultimapSideInputSpecs.build().rowMap());
     }
 
     public abstract ProcessBundleDescriptor getProcessBundleDescriptor();
@@ -197,5 +337,12 @@ public static ExecutableProcessBundleDescriptor of(
      * java {@link Coder} for the wire format of that {@link BeamFnApi.Target}.
      */
     public abstract Map<BeamFnApi.Target, Coder<WindowedValue<?>>> getOutputTargetCoders();
+
+
+    /**
+     * Get a mapping from PTransform id to multimap side input id to {@link MultimapSideInputSpec
+     * multimap side inputs} that are used during execution.
+     */
+    public abstract Map<String, Map<String, MultimapSideInputSpec>> getMultimapSideInputSpecs();
   }
 }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
index 51e50bc4812..795b25cb7b7 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -23,6 +23,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
@@ -39,20 +40,38 @@ public static GrpcStateService create() {
     return new GrpcStateService();
   }
 
+  private final ConcurrentLinkedQueue<Inbound> clients;
   private final ConcurrentMap<String, StateRequestHandler> requestHandlers;
 
   private GrpcStateService() {
     this.requestHandlers = new ConcurrentHashMap<>();
+    this.clients = new ConcurrentLinkedQueue<>();
   }
 
   @Override
-  public void close() {
-    // TODO: Track multiple clients and disconnect them cleanly instead of forcing termination
+  public void close() throws Exception {
+    Exception thrown = null;
+    for (Inbound inbound : clients) {
+      try {
+        inbound.outboundObserver.onCompleted();
+      } catch (Exception t) {
+        if (thrown == null) {
+          thrown = t;
+        } else {
+          thrown.addSuppressed(t);
+        }
+      }
+    }
+    if (thrown != null) {
+      throw thrown;
+    }
   }
 
   @Override
   public StreamObserver<StateRequest> state(StreamObserver<StateResponse> responseObserver) {
-    return new Inbound(responseObserver);
+    Inbound rval = new Inbound(responseObserver);
+    clients.add(rval);
+    return rval;
   }
 
   @Override
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java
new file mode 100644
index 00000000000..ecfe34fb76f
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlers.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.state;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey.TypeCase;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
+import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.MultimapSideInputSpec;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.stream.DataStreams;
+import org.apache.beam.sdk.fn.stream.DataStreams.ElementDelimitedOutputStream;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.common.Reiterable;
+
+/**
+ * A set of utility methods which construct {@link StateRequestHandler}s.
+ *
+ * <p>TODO: Add a variant which works on {@link ByteString}s to remove encoding/decoding overhead.
+ */
+public class StateRequestHandlers {
+
+  /**
+   * A handler for multimap side inputs.
+   *
+   * <p>Note that this handler is expected to be thread safe as it will be invoked concurrently.
+   */
+  @ThreadSafe
+  public interface MultimapSideInputHandler<K, V, W extends BoundedWindow> {
+    /**
+     * Returns an {@link Iterable} of values representing the side input for the given key and
+     * window.
+     *
+     * <p>TODO: Add support for side input chunking and caching if a {@link Reiterable} is returned.
+     */
+    Iterable<V> get(K key, W window);
+  }
+
+  /**
+   * A factory which constructs {@link MultimapSideInputHandler}s.
+   *
+   * <p>Note that this factory should be thread safe because it will be invoked concurrently.
+   */
+  @ThreadSafe
+  public interface MultimapSideInputHandlerFactory {
+
+    /**
+     * Returns a {@link MultimapSideInputHandler} for the given {@code pTransformId} and
+     * {@code sideInputId}. The supplied {@code keyCoder}, {@code valueCoder}, and
+     * {@code windowCoder} should be used to encode/decode their respective values.
+     */
+    <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W> forSideInput(
+        String pTransformId,
+        String sideInputId,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        Coder<W> windowCoder);
+
+    /**
+     * Throws a {@link UnsupportedOperationException} on the first access.
+     */
+    static MultimapSideInputHandlerFactory unsupported() {
+      return new MultimapSideInputHandlerFactory() {
+        @Override
+        public <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W> forSideInput(
+            String pTransformId, String sideInputId, Coder<K> keyCoder, Coder<V> valueCoder,
+            Coder<W> windowCoder) {
+          throw new UnsupportedOperationException(String.format(
+              "The %s does not support handling sides inputs for PTransform %s with side "
+                  + "input id %s.",
+              MultimapSideInputHandler.class.getSimpleName(),
+              pTransformId,
+              sideInputId));
+        }
+      };
+    }
+  }
+
+  /**
+   * A handler for bag user state.
+   *
+   * <p>Note that this handler is expected to be thread safe as it will be invoked concurrently.
+   */
+  @ThreadSafe
+  public interface BagUserStateHandler<K, V, W extends BoundedWindow> {
+    /**
+     * Returns an {@link Iterable} of values representing the bag user state for the given key and
+     * window.
+     *
+     * <p>TODO: Add support for bag user state chunking and caching if a {@link Reiterable} is
+     * returned.
+     */
+    Iterable<V> get(K key, W window);
+
+    /**
+     * Appends the values to the bag user state for the given key and window.
+     */
+    void append(K key, W window, Iterator<V> values);
+
+    /**
+     * Clears the bag user state for the given key and window.
+     */
+    void clear(K key, W window);
+  }
+
+  /**
+   * A factory which constructs {@link BagUserStateHandler}s.
+   *
+   * <p>Note that this factory should be thread safe.
+   */
+  @ThreadSafe
+  public interface BagUserStateHandlerFactory {
+    <K, V, W extends BoundedWindow> BagUserStateHandler<K, V, W> forUserState(
+        String pTransformId,
+        String userStateId,
+        Coder<K> keyCoder,
+        Coder<V> valueCoder,
+        Coder<W> windowCoder);
+
+    /**
+     * Throws a {@link UnsupportedOperationException} on the first access.
+     */
+    static BagUserStateHandlerFactory unsupported() {
+      return new BagUserStateHandlerFactory() {
+        @Override
+        public <K, V, W extends BoundedWindow> BagUserStateHandler<K, V, W> forUserState(
+            String pTransformId, String userStateId, Coder<K> keyCoder, Coder<V> valueCoder,
+            Coder<W> windowCoder) {
+          throw new UnsupportedOperationException(String.format(
+              "The %s does not support handling sides inputs for PTransform %s with user state "
+                  + "id %s.",
+              BagUserStateHandler.class.getSimpleName(),
+              pTransformId,
+              userStateId));
+        }
+      };
+    }
+  }
+
+  /**
+   * Returns an adapter which converts a {@link MultimapSideInputHandlerFactory} to a
+   * {@link StateRequestHandler}.
+   *
+   * <p>The {@link MultimapSideInputHandlerFactory} is required to handle all multimap side inputs
+   * contained within the {@link ExecutableProcessBundleDescriptor}. See
+   * {@link ExecutableProcessBundleDescriptor#getMultimapSideInputSpecs} for the set of multimap
+   * side inputs that are contained.
+   *
+   * <p>Instances of {@link MultimapSideInputHandler}s returned by the
+   * {@link MultimapSideInputHandlerFactory} are cached.
+   */
+  public static StateRequestHandler forMultimapSideInputHandlerFactory(
+      ExecutableProcessBundleDescriptor processBundleDescriptor,
+      MultimapSideInputHandlerFactory multimapSideInputHandlerFactory) {
+    return new StateRequestHandlerToMultimapSideInputHandlerFactoryAdapter(
+        processBundleDescriptor, multimapSideInputHandlerFactory);
+  }
+
+  /**
+   * An adapter which converts {@link MultimapSideInputHandlerFactory} to
+   * {@link StateRequestHandler}.
+   */
+  static class StateRequestHandlerToMultimapSideInputHandlerFactoryAdapter
+      implements StateRequestHandler {
+
+    private final ExecutableProcessBundleDescriptor processBundleDescriptor;
+    private final MultimapSideInputHandlerFactory multimapSideInputHandlerFactory;
+    private final ConcurrentHashMap<MultimapSideInputSpec, MultimapSideInputHandler> cache;
+
+    StateRequestHandlerToMultimapSideInputHandlerFactoryAdapter(
+        ExecutableProcessBundleDescriptor processBundleDescriptor,
+        MultimapSideInputHandlerFactory multimapSideInputHandlerFactory) {
+      this.processBundleDescriptor = processBundleDescriptor;
+      this.multimapSideInputHandlerFactory = multimapSideInputHandlerFactory;
+      this.cache = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public CompletionStage<StateResponse.Builder> handle(
+        StateRequest request) throws Exception {
+      try {
+        checkState(TypeCase.MULTIMAP_SIDE_INPUT.equals(request.getStateKey().getTypeCase()),
+            "Unsupported %s type %s, expected %s",
+            StateRequest.class.getSimpleName(),
+            request.getStateKey().getTypeCase(),
+            TypeCase.MULTIMAP_SIDE_INPUT);
+
+        StateKey.MultimapSideInput stateKey = request.getStateKey().getMultimapSideInput();
+        MultimapSideInputSpec<?, ?, ?> sideInputReferenceSpec =
+            processBundleDescriptor.getMultimapSideInputSpecs()
+                .get(stateKey.getPtransformId())
+                .get(stateKey.getSideInputId());
+        MultimapSideInputHandler<?, ?, ?> handler = cache.computeIfAbsent(
+            sideInputReferenceSpec,
+            this::createHandler);
+
+        switch (request.getRequestCase()) {
+          case GET:
+            return handleGetRequest(request, handler);
+          case APPEND:
+          case CLEAR:
+          default:
+            throw new Exception(String.format(
+                "Unsupported request type %s for side input.", request.getRequestCase()));
+        }
+      } catch (Exception e) {
+        CompletableFuture f = new CompletableFuture();
+        f.completeExceptionally(e);
+        return f;
+      }
+    }
+
+    private <K, V, W extends BoundedWindow> CompletionStage<StateResponse.Builder> handleGetRequest(
+        StateRequest request, MultimapSideInputHandler<K, V, W> handler) throws Exception {
+      // TODO: Add support for continuation tokens when handling state if the handler
+      // returned a {@link Reiterable}.
+      checkState(request.getGet().getContinuationToken().isEmpty(),
+          "Continuation tokens are unsupported.");
+
+      StateKey.MultimapSideInput stateKey = request.getStateKey().getMultimapSideInput();
+
+      MultimapSideInputSpec<K, V, W> sideInputReferenceSpec =
+          processBundleDescriptor.getMultimapSideInputSpecs()
+              .get(stateKey.getPtransformId())
+              .get(stateKey.getSideInputId());
+
+      K key = sideInputReferenceSpec.keyCoder().decode(stateKey.getKey().newInput());
+      W window = sideInputReferenceSpec.windowCoder().decode(stateKey.getWindow().newInput());
+
+      Iterable<V> values = handler.get(key, window);
+      List<ByteString> encodedValues = new ArrayList<>();
+      ElementDelimitedOutputStream outputStream = DataStreams.outbound(encodedValues::add);
+      for (V value : values) {
+        sideInputReferenceSpec.valueCoder().encode(value, outputStream);
+        outputStream.delimitElement();
+      }
+      outputStream.close();
+
+      StateResponse.Builder response = StateResponse.newBuilder();
+      response.setId(request.getId());
+      response.setGet(
+          StateGetResponse.newBuilder()
+              .setData(ByteString.copyFrom(encodedValues))
+              .build());
+      return CompletableFuture.completedFuture(response);
+    }
+
+    private <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W> createHandler(
+        MultimapSideInputSpec cacheKey) {
+      return multimapSideInputHandlerFactory.forSideInput(
+          cacheKey.transformId(),
+          cacheKey.sideInputId(),
+          cacheKey.keyCoder(),
+          cacheKey.valueCoder(),
+          cacheKey.windowCoder());
+    }
+  }
+}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 764ddca7b47..9e44fe5380a 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -22,10 +22,13 @@
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.Serializable;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -53,6 +56,11 @@
 import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
 import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandler;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandlers.MultimapSideInputHandlerFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -66,10 +74,14 @@
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -84,7 +96,9 @@
 public class RemoteExecutionTest implements Serializable {
   private transient GrpcFnServer<FnApiControlClientPoolService> controlServer;
   private transient GrpcFnServer<GrpcDataService> dataServer;
+  private transient GrpcFnServer<GrpcStateService> stateServer;
   private transient GrpcFnServer<GrpcLoggingService> loggingServer;
+  private transient GrpcStateService stateDelegator;
   private transient SdkHarnessClient controlClient;
 
   private transient ExecutorService serverExecutor;
@@ -102,6 +116,10 @@ public void setup() throws Exception {
     loggingServer =
         GrpcFnServer.allocatePortAndCreateFor(
             GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
+    stateDelegator = GrpcStateService.create();
+    stateServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            stateDelegator, serverFactory);
 
     ControlClientPool clientPool = MapControlClientPool.create();
     controlServer =
@@ -129,6 +147,7 @@ public void setup() throws Exception {
   @After
   public void tearDown() throws Exception {
     controlServer.close();
+    stateServer.close();
     dataServer.close();
     loggingServer.close();
     controlClient.close();
@@ -206,9 +225,119 @@ public void process(ProcessContext ctxt) {
     }
   }
 
+  @Test
+  public void testExecutionWithSideInput() throws Exception {
+    Pipeline p = Pipeline.create();
+    PCollection<String> input = p.apply("impulse", Impulse.create())
+        .apply(
+            "create",
+            ParDo.of(
+                new DoFn<byte[], String>() {
+                  @ProcessElement
+                  public void process(ProcessContext ctxt) {
+                    ctxt.output("zero");
+                    ctxt.output("one");
+                    ctxt.output("two");
+                  }
+                })).setCoder(StringUtf8Coder.of());
+    PCollectionView<Iterable<String>> view =
+        input.apply("createSideInput", View.asIterable());
+
+    input
+        .apply("readSideInput", ParDo.of(new DoFn<String, KV<String, String>>() {
+          @ProcessElement
+          public void processElement(ProcessContext context) {
+            for (String value : context.sideInput(view)) {
+              context.output(KV.of(context.element(), value));
+            }
+          }
+        }).withSideInputs(view))
+        .setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+        // Force the output to be materialized
+        .apply("gbk", GroupByKey.create());
+
+    RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
+    Components components = pipelineProto.getComponents();
+    FusedPipeline fused = GreedyPipelineFuser.fuse(pipelineProto);
+    Optional<ExecutableStage> optionalStage = Iterables.tryFind(
+        fused.getFusedStages(),
+        (ExecutableStage stage) -> !stage.getSideInputs().isEmpty());
+    checkState(optionalStage.isPresent(), "Expected a stage with side inputs.");
+    ExecutableStage stage = optionalStage.get();
+
+    ExecutableProcessBundleDescriptor descriptor =
+        ProcessBundleDescriptors.fromExecutableStage(
+            "test_stage",
+            stage,
+            dataServer.getApiServiceDescriptor(),
+            stateServer.getApiServiceDescriptor());
+    // TODO: This cast is nonsense
+    RemoteInputDestination<WindowedValue<byte[]>> remoteDestination =
+        (RemoteInputDestination<WindowedValue<byte[]>>)
+            (RemoteInputDestination) descriptor.getRemoteInputDestination();
+
+    BundleProcessor<byte[]> processor = controlClient.getProcessor(
+        descriptor.getProcessBundleDescriptor(), remoteDestination, stateDelegator);
+    Map<Target, Coder<WindowedValue<?>>> outputTargets = descriptor.getOutputTargetCoders();
+    Map<Target, Collection<WindowedValue<?>>> outputValues = new HashMap<>();
+    Map<Target, RemoteOutputReceiver<?>> outputReceivers = new HashMap<>();
+    for (Entry<Target, Coder<WindowedValue<?>>> targetCoder : outputTargets.entrySet()) {
+      List<WindowedValue<?>> outputContents = Collections.synchronizedList(new ArrayList<>());
+      outputValues.put(targetCoder.getKey(), outputContents);
+      outputReceivers.put(
+          targetCoder.getKey(),
+          RemoteOutputReceiver.of(targetCoder.getValue(), outputContents::add));
+    }
+
+    Iterable<byte[]> sideInputData = Arrays.asList(
+        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "A"),
+        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "B"),
+        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "C"));
+    StateRequestHandler stateRequestHandler =
+        StateRequestHandlers.forMultimapSideInputHandlerFactory(
+            descriptor,
+            new MultimapSideInputHandlerFactory() {
+              @Override
+              public <K, V, W extends BoundedWindow> MultimapSideInputHandler<K, V, W> forSideInput(
+                  String pTransformId, String sideInputId, Coder<K> keyCoder, Coder<V> valueCoder,
+                  Coder<W> windowCoder) {
+                return new MultimapSideInputHandler<K, V, W>() {
+                  @Override
+                  public Iterable<V> get(K key, W window) {
+                    return (Iterable) sideInputData;
+                  }
+                };
+              }
+            });
+
+    try (ActiveBundle<byte[]> bundle = processor.newBundle(outputReceivers, stateRequestHandler)) {
+      bundle.getInputReceiver().accept(WindowedValue.valueInGlobalWindow(
+          CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "X")));
+      bundle.getInputReceiver().accept(WindowedValue.valueInGlobalWindow(
+          CoderUtils.encodeToByteArray(StringUtf8Coder.of(), "Y")));
+    }
+    for (Collection<WindowedValue<?>> windowedValues : outputValues.values()) {
+      assertThat(
+          windowedValues,
+          containsInAnyOrder(
+              WindowedValue.valueInGlobalWindow(kvBytes("X", "A")),
+              WindowedValue.valueInGlobalWindow(kvBytes("X", "B")),
+              WindowedValue.valueInGlobalWindow(kvBytes("X", "C")),
+              WindowedValue.valueInGlobalWindow(kvBytes("Y", "A")),
+              WindowedValue.valueInGlobalWindow(kvBytes("Y", "B")),
+              WindowedValue.valueInGlobalWindow(kvBytes("Y", "C"))));
+    }
+  }
+
   private KV<byte[], byte[]> kvBytes(String key, long value) throws CoderException {
     return KV.of(
         CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key),
         CoderUtils.encodeToByteArray(BigEndianLongCoder.of(), value));
   }
+
+  private KV<byte[], byte[]> kvBytes(String key, String value) throws CoderException {
+    return KV.of(
+        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), key),
+        CoderUtils.encodeToByteArray(StringUtf8Coder.of(), value));
+  }
 }
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
index 4ddd5127f93..ec58915fc02 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DataStreams.java
@@ -36,35 +36,137 @@
  * {@link #outbound(OutputChunkConsumer)} treats a single {@link OutputStream} as multiple
  * {@link ByteString}s.
  */
-// TODO: Migrate logic from BeamFnDataBufferingOutboundObserver to support Outbound
 public class DataStreams {
+  public static final int DEFAULT_OUTBOUND_BUFFER_LIMIT_BYTES = 1_000_000;
+
   /**
    * Converts multiple {@link ByteString}s into a single {@link InputStream}.
    *
    * <p>The iterator is accessed lazily. The supplied {@link Iterator} should block until
    * either it knows that no more values will be provided or it has the next {@link ByteString}.
+   *
+   * <p>Note that this {@link InputStream} follows the Beam Fn API specification for forcing values
+   * that decode consuming zero bytes to consuming exactly one byte.
    */
   public static InputStream inbound(Iterator<ByteString> bytes) {
     return new Inbound(bytes);
   }
 
   /**
-   * Converts a single {@link OutputStream} into multiple {@link ByteString ByteStrings}.
+   * Converts a single element delimited {@link OutputStream} into multiple
+   * {@link ByteString ByteStrings}.
+   *
+   * <p>Note that users must call {@link ElementDelimitedOutputStream#delimitElement} after each
+   * element.
+   *
+   * <p>Note that this {@link OutputStream} follows the Beam Fn API specification for forcing values
+   * that encode producing zero bytes to produce exactly one byte.
+   */
+  public static ElementDelimitedOutputStream outbound(OutputChunkConsumer<ByteString> consumer) {
+    return outbound(consumer, DEFAULT_OUTBOUND_BUFFER_LIMIT_BYTES);
+  }
+
+  /**
+   * Converts a single element delimited {@link OutputStream} into multiple
+   * {@link ByteString ByteStrings} using the specified maximum chunk size.
+   *
+   * <p>Note that users must call {@link ElementDelimitedOutputStream#delimitElement} after each
+   * element.
+   *
+   * <p>Note that this {@link OutputStream} follows the Beam Fn API specification for forcing values
+   * that encode producing zero bytes to produce exactly one byte.
+   */
+  public static ElementDelimitedOutputStream outbound(
+      OutputChunkConsumer<ByteString> consumer, int maximumChunkSize) {
+    return new ElementDelimitedOutputStream(consumer, maximumChunkSize);
+  }
+
+  /**
+   * An adapter which wraps an {@link OutputChunkConsumer} as an {@link OutputStream}.
+   *
+   * <p>Note that this adapter follows the Beam Fn API specification for forcing values that encode
+   * producing zero bytes to produce exactly one byte.
+   *
+   * <p>Note that users must invoke {@link #delimitElement} at each element boundary.
    */
-  public static OutputStream outbound(OutputChunkConsumer<ByteString> consumer) {
-    // TODO: Migrate logic from BeamFnDataBufferingOutboundObserver
-    throw new UnsupportedOperationException();
+  public static final class ElementDelimitedOutputStream extends OutputStream {
+    private final OutputChunkConsumer<ByteString> consumer;
+    private final ByteString.Output output;
+    private final int maximumChunkSize;
+    int previousPosition;
+
+    public ElementDelimitedOutputStream(
+        OutputChunkConsumer<ByteString> consumer, int maximumChunkSize) {
+      this.consumer = consumer;
+      this.maximumChunkSize = maximumChunkSize;
+      this.output = ByteString.newOutput(maximumChunkSize);
+    }
+
+    public void delimitElement() throws IOException {
+      // If the previous encoding was exactly zero bytes, output a single marker byte as per
+      // https://s.apache.org/beam-fn-api-send-and-receive-data
+      if (previousPosition == output.size()) {
+        write(0);
+      }
+      previousPosition = output.size();
+    }
+
+    @Override
+    public void write(int i) throws IOException {
+      output.write(i);
+      if (maximumChunkSize == output.size()) {
+        internalFlush();
+      }
+    }
+
+    @Override
+    public void write(byte[] b, int offset, int length) throws IOException {
+      int spaceRemaining = maximumChunkSize - output.size();
+      // Fill the first partially filled buffer.
+      if (length > spaceRemaining) {
+        output.write(b, offset, spaceRemaining);
+        offset += spaceRemaining;
+        length -= spaceRemaining;
+        internalFlush();
+      }
+      // Fill buffers completely.
+      while (length > maximumChunkSize) {
+        output.write(b, offset, maximumChunkSize);
+        offset += maximumChunkSize;
+        length -= maximumChunkSize;
+        internalFlush();
+      }
+      // Fill any remainder.
+      output.write(b, offset, length);
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (output.size() > 0) {
+        consumer.read(output.toByteString());
+      }
+      output.close();
+    }
+
+    /**
+     * Can only be called if at least one byte has been written.
+     */
+    private void internalFlush() throws IOException {
+      consumer.read(output.toByteString());
+      output.reset();
+      // Set the previous position to an invalid position representing that a previous buffer
+      // was written to.
+      previousPosition = -1;
+    }
   }
 
   /**
-   * Reads chunks of output.
+   * A callback which is invoked whenever the {@link #outbound} {@link OutputStream} becomes full.
    *
-   * @deprecated Used as a temporary placeholder until implementation of
    * {@link #outbound(OutputChunkConsumer)}.
    */
-  @Deprecated
   public interface OutputChunkConsumer<T> {
-    void read(T chunk) throws Exception;
+    void read(T chunk) throws IOException;
   }
 
   /**
diff --git a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
index 35e18571db0..03c5f313aaa 100644
--- a/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
+++ b/sdks/java/fn-execution/src/test/java/org/apache/beam/sdk/fn/stream/DataStreamsTest.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.sdk.fn.stream;
 
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.collection.IsCollectionWithSize.*;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assume.assumeTrue;
 
 import com.google.common.collect.Iterators;
@@ -30,6 +33,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -40,6 +44,7 @@
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.fn.stream.DataStreams.BlockingQueueIterator;
 import org.apache.beam.sdk.fn.stream.DataStreams.DataStreamDecoder;
+import org.apache.beam.sdk.fn.stream.DataStreams.ElementDelimitedOutputStream;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.junit.Rule;
 import org.junit.Test;
@@ -161,4 +166,47 @@ public void testNonEmptyInputStreamWithZeroLengthCoder() throws Exception {
       decoder.next();
     }
   }
+
+  @RunWith(JUnit4.class)
+  public static class ElementDelimitedOutputStreamTest {
+    @Test
+    public void testNothingWritten() throws Exception {
+      List<ByteString> output = new ArrayList<>();
+      ElementDelimitedOutputStream outputStream = new ElementDelimitedOutputStream(output::add, 3);
+      outputStream.close();
+      assertThat(output, hasSize(0));
+    }
+
+    @Test
+    public void testEmptyElementsArePadded() throws Exception {
+      List<ByteString> output = new ArrayList<>();
+      ElementDelimitedOutputStream outputStream = new ElementDelimitedOutputStream(output::add, 3);
+      outputStream.delimitElement();
+      outputStream.delimitElement();
+      outputStream.delimitElement();
+      outputStream.delimitElement();
+      outputStream.delimitElement();
+      outputStream.close();
+      assertThat(output, contains(
+          ByteString.copyFrom(new byte[3]),
+          ByteString.copyFrom(new byte[2])));
+    }
+
+    @Test
+    public void testNonEmptyElementsAreChunked() throws Exception {
+      List<ByteString> output = new ArrayList<>();
+      ElementDelimitedOutputStream outputStream = new ElementDelimitedOutputStream(output::add, 3);
+      outputStream.write(new byte[] { 0x01, 0x02 });
+      outputStream.delimitElement();
+      outputStream.write(new byte[] { 0x03, 0x04, 0x05, 0x06, 0x07, 0x08 });
+      outputStream.delimitElement();
+      outputStream.write(0x09);
+      outputStream.delimitElement();
+      outputStream.close();
+      assertThat(output, contains(
+          ByteString.copyFrom(new byte[] { 0x01, 0x02, 0x03 }),
+          ByteString.copyFrom(new byte[] { 0x04, 0x05, 0x06 }),
+          ByteString.copyFrom(new byte[] { 0x07, 0x08, 0x09 })));
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 104107)
    Time Spent: 3h 50m  (was: 3h 40m)

> Executable stages allow side input coders to be set and/or queried
> ------------------------------------------------------------------
>
>                 Key: BEAM-4271
>                 URL: https://issues.apache.org/jira/browse/BEAM-4271
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Ben Sidhom
>            Assignee: Luke Cwik
>            Priority: Major
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> ProcessBundleDescriptors may contain side input references from inner PTransforms. These side inputs do not have explicit coders; instead, SDK harnesses use the PCollection coders by default.
> Using the default PCollection coder as specified at pipeline construction is in general not the correct thing to do. When PCollection elements are materialized, any coders unknown to a runner a length-prefixed. This means that materialized PCollections do not use their original element coders. Side inputs are delivered to SDKs via MultimapSideInput StateRequests. The responses to these requests are expected to contain all of the values for a given key (and window), coded with the PCollection KV.value coder, concatenated. However, at the time of serving these requests on the runner side, we do not have enough information to reconstruct the original value coders.
> There are different ways to address this issue. For example:
>  * Modify the associated PCollection coder to match the coder that the runner uses to materialize elements. This means that anywhere a given PCollection is used within a given bundle, it will use the runner-safe coder. This may introduce inefficiencies but should be "correct".
>  * Annotate side inputs with explicit coders. This guarantees that the key and value coders used by the runner match the coders used by SDKs. Furthermore, it allows the _runners_ to specify coders. This involves changes to the proto models and all SDKs.
>  * Annotate side input state requests with both key and value coders. This inverts the expected responsibility and has the SDK determine runner coders. Additionally, because runners do not understand all SDK types, additional coder substitution will need to be done at request handling time to make sure that the requested coder can be instantiated and will remain consistent with the SDK coder. This requires only small changes to SDKs because they may opt to use their default PCollection coders.
> All of the these approaches have their own downsides. Explicit side input coders is probably the right thing to do long-term, but the simplest change for now is to modify PCollection coders to match exactly how they're materialized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message