beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <>
Subject [jira] [Work logged] (BEAM-4271) Executable stages allow side input coders to be set and/or queried
Date Sat, 19 May 2018 00:31:00 GMT


ASF GitHub Bot logged work on BEAM-4271:

                Author: ASF GitHub Bot
            Created on: 19/May/18 00:30
            Start Date: 19/May/18 00:30
    Worklog Time Spent: 10m 
      Work Description: bsidhom commented on a change in pull request #5374: [BEAM-4271] Support
side inputs for ExecutableStage and provide runner side utilities for handling multimap side

 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/
 @@ -146,13 +189,89 @@ private static TargetEncoding addStageOutput(
+  private static Map<String, Map<String, MultimapSideInputSpec>> forMultimapSideInputs(
+      ExecutableStage stage,
+      Components components,
+      ProcessBundleDescriptor.Builder bundleDescriptorBuilder) throws IOException {
+    ImmutableTable.Builder<String, String, MultimapSideInputSpec> idsToSpec =
+        ImmutableTable.builder();
+    for (SideInputReference sideInputReference : stage.getSideInputs()) {
+      // Update the coder specification for side inputs to be length prefixed so that the
+      // SDK and Runner agree on how to encode/decode the key, window, and values for multimap
+      // side inputs.
+      String pCollectionId = sideInputReference.collection().getId();
+      RunnerApi.MessageWithComponents lengthPrefixedSideInputCoder =
+          LengthPrefixUnknownCoders.forCoder(
+              components.getPcollectionsOrThrow(pCollectionId).getCoderId(),
+              components,
+              false);
+      String wireCoderId =
+          addWireCoder(sideInputReference.collection(), components, bundleDescriptorBuilder);
+      String lengthPrefixedSideInputCoderId = SyntheticComponents.uniqueId(
+          String.format(
+              "fn/side_input/%s",
+              components.getPcollectionsOrThrow(pCollectionId).getCoderId()),
+          bundleDescriptorBuilder.getCodersMap().keySet()::contains);
+      bundleDescriptorBuilder.putCoders(
+          lengthPrefixedSideInputCoderId, lengthPrefixedSideInputCoder.getCoder());
+      bundleDescriptorBuilder.putAllCoders(
+          lengthPrefixedSideInputCoder.getComponents().getCodersMap());
+      bundleDescriptorBuilder.putPcollections(
+          pCollectionId,
+          bundleDescriptorBuilder
+              .getPcollectionsMap()
+              .get(pCollectionId)
+              .toBuilder()
+              .setCoderId(lengthPrefixedSideInputCoderId)
+              .build());
+      FullWindowedValueCoder<KV<?, ?>> coder =
+          (FullWindowedValueCoder) WireCoders.instantiateRunnerWireCoder(
 Review comment:
   I would like that in practice, although currently we require WireCoders on the runner side
(at least in Flink) in order to give collections an associated serialization mechanism. This
may change down the line when everything is "just bytes", but that will require some additional
machinery. For example, to extract keys and values for grouping operations.
   The good news is that runner-side coders need not be kept in sync with the wire coders
here. Anyway, this seems fine for now and we can address that when we get around to it.

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

Issue Time Tracking

    Worklog Id:     (was: 103612)
    Time Spent: 3h 10m  (was: 3h)

> Executable stages allow side input coders to be set and/or queried
> ------------------------------------------------------------------
>                 Key: BEAM-4271
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Ben Sidhom
>            Assignee: Luke Cwik
>            Priority: Major
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
> ProcessBundleDescriptors may contain side input references from inner PTransforms. These
side inputs do not have explicit coders; instead, SDK harnesses use the PCollection coders
by default.
> Using the default PCollection coder as specified at pipeline construction is in general
not the correct thing to do. When PCollection elements are materialized, any coders unknown
to a runner a length-prefixed. This means that materialized PCollections do not use their
original element coders. Side inputs are delivered to SDKs via MultimapSideInput StateRequests.
The responses to these requests are expected to contain all of the values for a given key
(and window), coded with the PCollection KV.value coder, concatenated. However, at the time
of serving these requests on the runner side, we do not have enough information to reconstruct
the original value coders.
> There are different ways to address this issue. For example:
>  * Modify the associated PCollection coder to match the coder that the runner uses to
materialize elements. This means that anywhere a given PCollection is used within a given
bundle, it will use the runner-safe coder. This may introduce inefficiencies but should be
>  * Annotate side inputs with explicit coders. This guarantees that the key and value
coders used by the runner match the coders used by SDKs. Furthermore, it allows the _runners_
to specify coders. This involves changes to the proto models and all SDKs.
>  * Annotate side input state requests with both key and value coders. This inverts the
expected responsibility and has the SDK determine runner coders. Additionally, because runners
do not understand all SDK types, additional coder substitution will need to be done at request
handling time to make sure that the requested coder can be instantiated and will remain consistent
with the SDK coder. This requires only small changes to SDKs because they may opt to use their
default PCollection coders.
> All of the these approaches have their own downsides. Explicit side input coders is
probably the right thing to do long-term, but the simplest change for now is to modify PCollection
coders to match exactly how they're materialized.

This message was sent by Atlassian JIRA

View raw message