beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4271) Executable stages allow side input coders to be set and/or queried
Date Mon, 21 May 2018 16:20:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4271?focusedWorklogId=104106&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104106
]

ASF GitHub Bot logged work on BEAM-4271:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/May/18 16:19
            Start Date: 21/May/18 16:19
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #5374: [BEAM-4271]
Support side inputs for ExecutableStage and provide runner side utilities for handling multimap
side inputs.
URL: https://github.com/apache/beam/pull/5374#discussion_r189639712
 
 

 ##########
 File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessBundleDescriptors.java
 ##########
 @@ -146,13 +189,89 @@ private static TargetEncoding addStageOutput(
         wireCoder);
   }
 
+  private static Map<String, Map<String, MultimapSideInputSpec>> forMultimapSideInputs(
+      ExecutableStage stage,
+      Components components,
+      ProcessBundleDescriptor.Builder bundleDescriptorBuilder) throws IOException {
+    ImmutableTable.Builder<String, String, MultimapSideInputSpec> idsToSpec =
+        ImmutableTable.builder();
+    for (SideInputReference sideInputReference : stage.getSideInputs()) {
+      // Update the coder specification for side inputs to be length prefixed so that the
+      // SDK and Runner agree on how to encode/decode the key, window, and values for multimap
+      // side inputs.
+      String pCollectionId = sideInputReference.collection().getId();
+      RunnerApi.MessageWithComponents lengthPrefixedSideInputCoder =
+          LengthPrefixUnknownCoders.forCoder(
+              components.getPcollectionsOrThrow(pCollectionId).getCoderId(),
+              components,
+              false);
+      String wireCoderId =
+          addWireCoder(sideInputReference.collection(), components, bundleDescriptorBuilder);
+      String lengthPrefixedSideInputCoderId = SyntheticComponents.uniqueId(
+          String.format(
+              "fn/side_input/%s",
+              components.getPcollectionsOrThrow(pCollectionId).getCoderId()),
+          bundleDescriptorBuilder.getCodersMap().keySet()::contains);
+
+      bundleDescriptorBuilder.putCoders(
+          lengthPrefixedSideInputCoderId, lengthPrefixedSideInputCoder.getCoder());
+      bundleDescriptorBuilder.putAllCoders(
+          lengthPrefixedSideInputCoder.getComponents().getCodersMap());
+      bundleDescriptorBuilder.putPcollections(
+          pCollectionId,
+          bundleDescriptorBuilder
+              .getPcollectionsMap()
+              .get(pCollectionId)
+              .toBuilder()
+              .setCoderId(lengthPrefixedSideInputCoderId)
+              .build());
+
+      FullWindowedValueCoder<KV<?, ?>> coder =
+          (FullWindowedValueCoder) WireCoders.instantiateRunnerWireCoder(
 
 Review comment:
   As the current interfaces are structured, we only support side inputs that don't need pushback
(batch pipelines). Another interface will be added once main window -> side input window
mapping is added to the `BundleProcessor` for side inputs for unbounded PCollections
   
   Yes, the SDK will be responsible for key extraction and will be provided to the Runner.
It will be analogous to how the current MultimapSideInputHandler is provided key, window,
and relevant coders. See the BagUserStateHandler part of this PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 104106)
    Time Spent: 3h 40m  (was: 3.5h)

> Executable stages allow side input coders to be set and/or queried
> ------------------------------------------------------------------
>
>                 Key: BEAM-4271
>                 URL: https://issues.apache.org/jira/browse/BEAM-4271
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Ben Sidhom
>            Assignee: Luke Cwik
>            Priority: Major
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> ProcessBundleDescriptors may contain side input references from inner PTransforms. These
side inputs do not have explicit coders; instead, SDK harnesses use the PCollection coders
by default.
> Using the default PCollection coder as specified at pipeline construction is in general
not the correct thing to do. When PCollection elements are materialized, any coders unknown
to a runner a length-prefixed. This means that materialized PCollections do not use their
original element coders. Side inputs are delivered to SDKs via MultimapSideInput StateRequests.
The responses to these requests are expected to contain all of the values for a given key
(and window), coded with the PCollection KV.value coder, concatenated. However, at the time
of serving these requests on the runner side, we do not have enough information to reconstruct
the original value coders.
> There are different ways to address this issue. For example:
>  * Modify the associated PCollection coder to match the coder that the runner uses to
materialize elements. This means that anywhere a given PCollection is used within a given
bundle, it will use the runner-safe coder. This may introduce inefficiencies but should be
"correct".
>  * Annotate side inputs with explicit coders. This guarantees that the key and value
coders used by the runner match the coders used by SDKs. Furthermore, it allows the _runners_
to specify coders. This involves changes to the proto models and all SDKs.
>  * Annotate side input state requests with both key and value coders. This inverts the
expected responsibility and has the SDK determine runner coders. Additionally, because runners
do not understand all SDK types, additional coder substitution will need to be done at request
handling time to make sure that the requested coder can be instantiated and will remain consistent
with the SDK coder. This requires only small changes to SDKs because they may opt to use their
default PCollection coders.
> All of the these approaches have their own downsides. Explicit side input coders is
probably the right thing to do long-term, but the simplest change for now is to modify PCollection
coders to match exactly how they're materialized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message