beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Work logged] (BEAM-4271) Executable stages allow side input coders to be set and/or queried
Date Sat, 19 May 2018 01:46:00 GMT


ASF GitHub Bot logged work on BEAM-4271:

                Author: ASF GitHub Bot
            Created on: 19/May/18 01:45
            Start Date: 19/May/18 01:45
    Worklog Time Spent: 10m 
      Work Description: tweise commented on a change in pull request #5374: [BEAM-4271] Support
side inputs for ExecutableStage and provide runner side utilities for handling multimap side

 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/
 @@ -146,13 +189,89 @@ private static TargetEncoding addStageOutput(
+  private static Map<String, Map<String, MultimapSideInputSpec>> forMultimapSideInputs(
+      ExecutableStage stage,
+      Components components,
+      ProcessBundleDescriptor.Builder bundleDescriptorBuilder) throws IOException {
+    ImmutableTable.Builder<String, String, MultimapSideInputSpec> idsToSpec =
+        ImmutableTable.builder();
+    for (SideInputReference sideInputReference : stage.getSideInputs()) {
+      // Update the coder specification for side inputs to be length prefixed so that the
+      // SDK and Runner agree on how to encode/decode the key, window, and values for multimap
+      // side inputs.
+      String pCollectionId = sideInputReference.collection().getId();
+      RunnerApi.MessageWithComponents lengthPrefixedSideInputCoder =
+          LengthPrefixUnknownCoders.forCoder(
+              components.getPcollectionsOrThrow(pCollectionId).getCoderId(),
+              components,
+              false);
+      String wireCoderId =
+          addWireCoder(sideInputReference.collection(), components, bundleDescriptorBuilder);
+      String lengthPrefixedSideInputCoderId = SyntheticComponents.uniqueId(
+          String.format(
+              "fn/side_input/%s",
+              components.getPcollectionsOrThrow(pCollectionId).getCoderId()),
+          bundleDescriptorBuilder.getCodersMap().keySet()::contains);
+      bundleDescriptorBuilder.putCoders(
+          lengthPrefixedSideInputCoderId, lengthPrefixedSideInputCoder.getCoder());
+      bundleDescriptorBuilder.putAllCoders(
+          lengthPrefixedSideInputCoder.getComponents().getCodersMap());
+      bundleDescriptorBuilder.putPcollections(
+          pCollectionId,
+          bundleDescriptorBuilder
+              .getPcollectionsMap()
+              .get(pCollectionId)
+              .toBuilder()
+              .setCoderId(lengthPrefixedSideInputCoderId)
+              .build());
+      FullWindowedValueCoder<KV<?, ?>> coder =
+          (FullWindowedValueCoder) WireCoders.instantiateRunnerWireCoder(
 Review comment:
   Serialization will also be needed for push back with side inputs (at least for streaming).
   In the old, non-portable Flink runner's ParDo execution, the operator also requires access
to the key to manage state and timers. Will the key extraction be in the SDK (and a parameter
to the runner)?

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

Issue Time Tracking

    Worklog Id:     (was: 103627)
    Time Spent: 3.5h  (was: 3h 20m)

> Executable stages allow side input coders to be set and/or queried
> ------------------------------------------------------------------
>                 Key: BEAM-4271
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Ben Sidhom
>            Assignee: Luke Cwik
>            Priority: Major
>          Time Spent: 3.5h
>  Remaining Estimate: 0h
> ProcessBundleDescriptors may contain side input references from inner PTransforms. These
side inputs do not have explicit coders; instead, SDK harnesses use the PCollection coders
by default.
> Using the default PCollection coder as specified at pipeline construction is in general
not the correct thing to do. When PCollection elements are materialized, any coders unknown
to a runner a length-prefixed. This means that materialized PCollections do not use their
original element coders. Side inputs are delivered to SDKs via MultimapSideInput StateRequests.
The responses to these requests are expected to contain all of the values for a given key
(and window), coded with the PCollection KV.value coder, concatenated. However, at the time
of serving these requests on the runner side, we do not have enough information to reconstruct
the original value coders.
> There are different ways to address this issue. For example:
>  * Modify the associated PCollection coder to match the coder that the runner uses to
materialize elements. This means that anywhere a given PCollection is used within a given
bundle, it will use the runner-safe coder. This may introduce inefficiencies but should be
>  * Annotate side inputs with explicit coders. This guarantees that the key and value
coders used by the runner match the coders used by SDKs. Furthermore, it allows the _runners_
to specify coders. This involves changes to the proto models and all SDKs.
>  * Annotate side input state requests with both key and value coders. This inverts the
expected responsibility and has the SDK determine runner coders. Additionally, because runners
do not understand all SDK types, additional coder substitution will need to be done at request
handling time to make sure that the requested coder can be instantiated and will remain consistent
with the SDK coder. This requires only small changes to SDKs because they may opt to use their
default PCollection coders.
> All of the these approaches have their own downsides. Explicit side input coders is
probably the right thing to do long-term, but the simplest change for now is to modify PCollection
coders to match exactly how they're materialized.

This message was sent by Atlassian JIRA

View raw message