beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-2597) FlinkRunner ExecutableStage batch operator
Date Tue, 08 May 2018 22:26:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-2597?focusedWorklogId=99757&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-99757
]

ASF GitHub Bot logged work on BEAM-2597:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/May/18 22:25
            Start Date: 08/May/18 22:25
    Worklog Time Spent: 10m 
      Work Description: jkff commented on a change in pull request #5285: [BEAM-2597] Flink
batch ExecutableStage operator
URL: https://github.com/apache/beam/pull/5285#discussion_r186884010
 
 

 ##########
 File path: runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
 ##########
 @@ -38,36 +53,117 @@
 public class FlinkExecutableStageFunction<InputT>
     extends RichMapPartitionFunction<WindowedValue<InputT>, RawUnionValue> {
 
+  // Main constructor fields. All must be Serializable because Flink distributes Functions
to
+  // task managers via java serialization.
+
   // The executable stage this function will run.
   private final RunnerApi.ExecutableStagePayload stagePayload;
   // Pipeline options. Used for provisioning api.
-  private final Struct pipelineOptions;
+  private final JobInfo jobInfo;
   // Map from PCollection id to the union tag used to represent this PCollection in the output.
   private final Map<String, Integer> outputMap;
+  private final SerializableSupplier<FlinkBundleFactory> bundleFactorySupplier;
+  private final DistributedCachePool.Factory cachePoolFactory;
+  private final FlinkStateRequestHandlerFactory stateHandlerFactory;
+
+  // Worker-local fields. These should only be constructed and consumed on Flink TaskManagers.
+  private transient RuntimeContext runtimeContext;
+  private transient StateRequestHandler stateRequestHandler;
+  private transient StageBundleFactory stageBundleFactory;
+  private transient AutoCloseable distributedCacheCloser;
 
   public FlinkExecutableStageFunction(
       RunnerApi.ExecutableStagePayload stagePayload,
-      Struct pipelineOptions,
-      Map<String, Integer> outputMap) {
+      JobInfo jobInfo,
+      Map<String, Integer> outputMap,
+      SerializableSupplier<FlinkBundleFactory> bundleFactorySupplier,
 
 Review comment:
   Hm I wonder if it makes sense then to encapsulate these three things into a "FlinkExecutableStageContext"
and have a ContextFactory, so that at least we avoid the proliferation of factories.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 99757)
    Time Spent: 2h 40m  (was: 2.5h)

> FlinkRunner ExecutableStage batch operator
> ------------------------------------------
>
>                 Key: BEAM-2597
>                 URL: https://issues.apache.org/jira/browse/BEAM-2597
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Kenneth Knowles
>            Assignee: Ben Sidhom
>            Priority: Major
>              Labels: portability
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> This operator will execute user code in the context of an SDK harness by constructing
a ProcessBundleDescriptor from an ExecutableStage (physical stage plan) and sending instructions/elements
over the control and data planes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message