beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] beam git commit: Port fn_api_runner to be able to use runner protos.
Date Tue, 20 Jun 2017 20:47:49 GMT
Repository: beam
Updated Branches:
  refs/heads/master f51fdd960 -> e4ef23e16


Port fn_api_runner to be able to use runner protos.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/08ec0d4d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/08ec0d4d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/08ec0d4d

Branch: refs/heads/master
Commit: 08ec0d4dbff330ecd48c806cd764ab5a96835bd9
Parents: f51fdd9
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Tue Jun 20 11:01:03 2017 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Tue Jun 20 13:47:30 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/pipeline_context.py     |  17 +-
 .../runners/portability/fn_api_runner.py        | 166 ++++++++++++-
 .../runners/portability/fn_api_runner_test.py   |  20 +-
 .../apache_beam/runners/worker/sdk_worker.py    | 243 ++++++++++++++++++-
 4 files changed, 420 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index e212abf..c2ae3f3 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -24,6 +24,7 @@ For internal use only; no backwards-compatibility guarantees.
 from apache_beam import pipeline
 from apache_beam import pvalue
 from apache_beam import coders
+from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.transforms import core
 
@@ -42,9 +43,10 @@ class _PipelineContextMap(object):
     self._id_to_proto = proto_map if proto_map else {}
     self._counter = 0
 
-  def _unique_ref(self):
+  def _unique_ref(self, obj=None):
     self._counter += 1
-    return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
+    return "ref_%s_%s_%s" % (
+        self._obj_type.__name__, type(obj).__name__, self._counter)
 
   def populate_map(self, proto_map):
     for id, proto in self._id_to_proto.items():
@@ -52,7 +54,7 @@ class _PipelineContextMap(object):
 
   def get_id(self, obj):
     if obj not in self._obj_to_id:
-      id = self._unique_ref()
+      id = self._unique_ref(obj)
       self._id_to_obj[id] = obj
       self._obj_to_id[obj] = id
       self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
@@ -79,11 +81,16 @@ class PipelineContext(object):
       # TODO: environment
   }
 
-  def __init__(self, context_proto=None):
+  def __init__(self, proto=None):
+    if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor):
+      proto = beam_runner_api_pb2.Components(
+          coders=dict(proto.codersyyy.items()),
+          windowing_strategies=dict(proto.windowing_strategies.items()),
+          environments=dict(proto.environments.items()))
     for name, cls in self._COMPONENT_TYPES.items():
       setattr(
           self, name, _PipelineContextMap(
-              self, cls, getattr(context_proto, name, None)))
+              self, cls, getattr(proto, name, None)))
 
   @staticmethod
   def from_runner_api(proto):

http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index d792131..dabb7d6 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -24,9 +24,10 @@ import Queue as queue
 import threading
 
 from concurrent import futures
+from google.protobuf import wrappers_pb2
 import grpc
 
-import apache_beam as beam
+import apache_beam as beam  # pylint: disable=ungrouped-imports
 from apache_beam.coders import WindowedValueCoder
 from apache_beam.coders.coder_impl import create_InputStream
 from apache_beam.coders.coder_impl import create_OutputStream
@@ -34,10 +35,13 @@ from apache_beam.internal import pickler
 from apache_beam.io import iobase
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners import pipeline_context
 from apache_beam.runners.portability import maptask_executor_runner
 from apache_beam.runners.worker import data_plane
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import sdk_worker
+from apache_beam.utils import proto_utils
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -110,9 +114,13 @@ OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps(
 
 class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
-  def __init__(self):
+  def __init__(self, use_runner_protos=False):
     super(FnApiRunner, self).__init__()
     self._last_uid = -1
+    if use_runner_protos:
+      self._map_task_to_protos = self._map_task_to_runner_protos
+    else:
+      self._map_task_to_protos = self._map_task_to_fn_protos
 
   def has_metrics_support(self):
     return False
@@ -123,7 +131,140 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
   def _map_task_registration(self, map_task, state_handler,
                              data_operation_spec):
+    input_data, side_input_data, runner_sinks, process_bundle_descriptor = (
+        self._map_task_to_protos(map_task, data_operation_spec))
+    # Side inputs will be accessed over the state API.
+    for key, elements_data in side_input_data.items():
+      state_key = beam_fn_api_pb2.StateKey.MultimapSideInput(key=key)
+      state_handler.Clear(state_key)
+      state_handler.Append(state_key, [elements_data])
+    return beam_fn_api_pb2.InstructionRequest(
+        instruction_id=self._next_uid(),
+        register=beam_fn_api_pb2.RegisterRequest(
+            process_bundle_descriptor=[process_bundle_descriptor])
+        ), runner_sinks, input_data
+
+  def _map_task_to_runner_protos(self, map_task, data_operation_spec):
+    input_data = {}
+    side_input_data = {}
+    runner_sinks = {}
+
+    context = pipeline_context.PipelineContext()
+    transform_protos = {}
+    used_pcollections = {}
+
+    def uniquify(*names):
+      # An injective mapping from string* to string.
+      return ':'.join("%s:%d" % (name, len(name)) for name in names)
+
+    def pcollection_id(op_ix, out_ix):
+      if (op_ix, out_ix) not in used_pcollections:
+        used_pcollections[op_ix, out_ix] = uniquify(
+            map_task[op_ix][0], 'out', str(out_ix))
+      return used_pcollections[op_ix, out_ix]
+
+    def get_inputs(op):
+      if hasattr(op, 'inputs'):
+        inputs = op.inputs
+      elif hasattr(op, 'input'):
+        inputs = [op.input]
+      else:
+        inputs = []
+      return {'in%s' % ix: pcollection_id(*input)
+              for ix, input in enumerate(inputs)}
+
+    def get_outputs(op_ix):
+      op = map_task[op_ix][1]
+      return {tag: pcollection_id(op_ix, out_ix)
+              for out_ix, tag in enumerate(getattr(op, 'output_tags', ['out']))}
+
+    for op_ix, (stage_name, operation) in enumerate(map_task):
+      transform_id = uniquify(stage_name)
+
+      if isinstance(operation, operation_specs.WorkerInMemoryWrite):
+        # Write this data back to the runner.
+        runner_sinks[(transform_id, 'out')] = operation
+        transform_spec = beam_runner_api_pb2.FunctionSpec(
+            urn=sdk_worker.DATA_OUTPUT_URN,
+            parameter=proto_utils.pack_Any(data_operation_spec))
+
+      elif isinstance(operation, operation_specs.WorkerRead):
+        # A Read from an in-memory source is done over the data plane.
+        if (isinstance(operation.source.source,
+                       maptask_executor_runner.InMemorySource)
+            and isinstance(operation.source.source.default_output_coder(),
+                           WindowedValueCoder)):
+          input_data[(transform_id, 'input')] = self._reencode_elements(
+              operation.source.source.read(None),
+              operation.source.source.default_output_coder())
+          transform_spec = beam_runner_api_pb2.FunctionSpec(
+              urn=sdk_worker.DATA_INPUT_URN,
+              parameter=proto_utils.pack_Any(data_operation_spec))
+
+        else:
+          # Otherwise serialize the source and execute it there.
+          # TODO: Use SDFs with an initial impulse.
+          transform_spec = beam_runner_api_pb2.FunctionSpec(
+              urn=sdk_worker.PYTHON_SOURCE_URN,
+              parameter=proto_utils.pack_Any(
+                  wrappers_pb2.BytesValue(
+                      value=pickler.dumps(operation.source.source))))
+
+      elif isinstance(operation, operation_specs.WorkerDoFn):
+        # Record the contents of each side input for access via the state api.
+        side_input_extras = []
+        for si in operation.side_inputs:
+          assert isinstance(si.source, iobase.BoundedSource)
+          element_coder = si.source.default_output_coder()
+          # TODO(robertwb): Actually flesh out the ViewFn API.
+          side_input_extras.append((si.tag, element_coder))
+          side_input_data[sdk_worker.side_input_tag(transform_id, si.tag)] = (
+              self._reencode_elements(
+                  si.source.read(si.source.get_range_tracker(None, None)),
+                  element_coder))
+        augmented_serialized_fn = pickler.dumps(
+            (operation.serialized_fn, side_input_extras))
+        transform_spec = beam_runner_api_pb2.FunctionSpec(
+            urn=sdk_worker.PYTHON_DOFN_URN,
+            parameter=proto_utils.pack_Any(
+                wrappers_pb2.BytesValue(value=augmented_serialized_fn)))
+
+      elif isinstance(operation, operation_specs.WorkerFlatten):
+        # Flatten is nice and simple.
+        transform_spec = beam_runner_api_pb2.FunctionSpec(
+            urn=sdk_worker.IDENTITY_DOFN_URN)
+
+      else:
+        raise NotImplementedError(operation)
+
+      transform_protos[transform_id] = beam_runner_api_pb2.PTransform(
+          unique_name=stage_name,
+          spec=transform_spec,
+          inputs=get_inputs(operation),
+          outputs=get_outputs(op_ix))
+
+    pcollection_protos = {
+        name: beam_runner_api_pb2.PCollection(
+            unique_name=name,
+            coder_id=context.coders.get_id(
+                map_task[op_id][1].output_coders[out_id]))
+        for (op_id, out_id), name in used_pcollections.items()
+    }
+    # Must follow creation of pcollection_protos to capture used coders.
+    context_proto = context.to_runner_api()
+    process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor(
+        id=self._next_uid(),
+        transforms=transform_protos,
+        pcollections=pcollection_protos,
+        codersyyy=dict(context_proto.coders.items()),
+        windowing_strategies=dict(context_proto.windowing_strategies.items()),
+        environments=dict(context_proto.environments.items()))
+    return input_data, side_input_data, runner_sinks, process_bundle_descriptor
+
+  def _map_task_to_fn_protos(self, map_task, data_operation_spec):
+
     input_data = {}
+    side_input_data = {}
     runner_sinks = {}
     transforms = []
     transform_index_to_id = {}
@@ -264,9 +405,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
             element_coder.get_impl().encode_to_stream(
                 element, output_stream, True)
           elements_data = output_stream.get()
-          state_key = beam_fn_api_pb2.StateKey.MultimapSideInput(key=view_id)
-          state_handler.Clear(state_key)
-          state_handler.Append(state_key, elements_data)
+          side_input_data[view_id] = elements_data
 
       elif isinstance(operation, operation_specs.WorkerFlatten):
         fn = sdk_worker.pack_function_spec_data(
@@ -299,13 +438,11 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       transforms.append(ptransform)
 
     process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor(
-        id=self._next_uid(), coders=coders.values(),
+        id=self._next_uid(),
+        coders=coders.values(),
         primitive_transform=transforms)
-    return beam_fn_api_pb2.InstructionRequest(
-        instruction_id=self._next_uid(),
-        register=beam_fn_api_pb2.RegisterRequest(
-            process_bundle_descriptor=[process_bundle_descriptor
-                                      ])), runner_sinks, input_data
+
+    return input_data, side_input_data, runner_sinks, process_bundle_descriptor
 
   def _run_map_task(
       self, map_task, control_handler, state_handler, data_plane_handler,
@@ -467,3 +604,10 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       self.data_plane_handler.close()
       self.control_server.stop(5).wait()
       self.data_server.stop(5).wait()
+
+  @staticmethod
+  def _reencode_elements(elements, element_coder):
+    output_stream = create_OutputStream()
+    for element in elements:
+      element_coder.get_impl().encode_to_stream(element, output_stream, True)
+    return output_stream.get()

http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 66d985a..e2eae26 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -23,10 +23,26 @@ from apache_beam.runners.portability import fn_api_runner
 from apache_beam.runners.portability import maptask_executor_runner_test
 
 
-class FnApiRunnerTest(maptask_executor_runner_test.MapTaskExecutorRunnerTest):
+class FnApiRunnerTestWithRunnerProtos(
+    maptask_executor_runner_test.MapTaskExecutorRunnerTest):
 
   def create_pipeline(self):
-    return beam.Pipeline(runner=fn_api_runner.FnApiRunner())
+    return beam.Pipeline(
+        runner=fn_api_runner.FnApiRunner(use_runner_protos=True))
+
+  def test_combine_per_key(self):
+    # TODO(robertwb): Implement PGBKCV operation.
+    pass
+
+  # Inherits all tests from maptask_executor_runner.MapTaskExecutorRunner
+
+
+class FnApiRunnerTestWithFnProtos(
+    maptask_executor_runner_test.MapTaskExecutorRunnerTest):
+
+  def create_pipeline(self):
+    return beam.Pipeline(
+        runner=fn_api_runner.FnApiRunner(use_runner_protos=False))
 
   def test_combine_per_key(self):
     # TODO(robertwb): Implement PGBKCV operation.

http://git-wip-us.apache.org/repos/asf/beam/blob/08ec0d4d/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index d08b179..fd7ecc4 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -36,11 +36,13 @@ from apache_beam.coders import coder_impl
 from apache_beam.coders import WindowedValueCoder
 from apache_beam.internal import pickler
 from apache_beam.io import iobase
-from apache_beam.runners.dataflow.native_io import iobase as native_iobase
-from apache_beam.utils import counters
 from apache_beam.portability.api import beam_fn_api_pb2
+from apache_beam.runners.dataflow.native_io import iobase as native_iobase
+from apache_beam.runners import pipeline_context
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import operations
+from apache_beam.utils import counters
+from apache_beam.utils import proto_utils
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -62,6 +64,10 @@ PYTHON_DOFN_URN = 'urn:org.apache.beam:dofn:java:0.1'
 PYTHON_SOURCE_URN = 'urn:org.apache.beam:source:java:0.1'
 
 
+def side_input_tag(transform_id, tag):
+  return str("%d[%s][%s]" % (len(transform_id), transform_id, tag))
+
+
 class RunnerIOOperation(operations.Operation):
   """Common baseclass for runner harness IO operations."""
 
@@ -208,6 +214,23 @@ def load_compressed(compressed_data):
     dill.dill._trace(False)  # pylint: disable=protected-access
 
 
+def memoize(func):
+  cache = {}
+  missing = object()
+
+  def wrapper(*args):
+    result = cache.get(args, missing)
+    if result is missing:
+      result = cache[args] = func(*args)
+    return result
+  return wrapper
+
+
+def only_element(iterable):
+  element, = iterable
+  return element
+
+
 class SdkHarness(object):
 
   def __init__(self, control_channel):
@@ -296,6 +319,51 @@ class SdkWorker(object):
     return response
 
   def create_execution_tree(self, descriptor):
+    if descriptor.primitive_transform:
+      return self.create_execution_tree_from_fn_api(descriptor)
+    else:
+      return self.create_execution_tree_from_runner_api(descriptor)
+
+  def create_execution_tree_from_runner_api(self, descriptor):
+    # TODO(robertwb): Figure out the correct prefix to use for output counters
+    # from StateSampler.
+    counter_factory = counters.CounterFactory()
+    state_sampler = statesampler.StateSampler(
+        'fnapi-step%s-' % descriptor.id, counter_factory)
+
+    transform_factory = BeamTransformFactory(
+        descriptor, self.data_channel_factory, counter_factory, state_sampler,
+        self.state_handler)
+
+    pcoll_consumers = collections.defaultdict(list)
+    for transform_id, transform_proto in descriptor.transforms.items():
+      for pcoll_id in transform_proto.inputs.values():
+        pcoll_consumers[pcoll_id].append(transform_id)
+
+    @memoize
+    def get_operation(transform_id):
+      transform_consumers = {
+          tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
+          for tag, pcoll_id
+          in descriptor.transforms[transform_id].outputs.items()
+      }
+      return transform_factory.create_operation(
+          transform_id, transform_consumers)
+
+    # Operations must be started (hence returned) in order.
+    @memoize
+    def topological_height(transform_id):
+      return 1 + max(
+          [0] +
+          [topological_height(consumer)
+           for pcoll in descriptor.transforms[transform_id].outputs.values()
+           for consumer in pcoll_consumers[pcoll]])
+
+    return [get_operation(transform_id)
+            for transform_id in sorted(
+                descriptor.transforms, key=topological_height, reverse=True)]
+
+  def create_execution_tree_from_fn_api(self, descriptor):
     # TODO(vikasrk): Add an id field to Coder proto and use that instead.
     coders = {coder.function_spec.id: operation_specs.get_coder_from_spec(
         json.loads(unpack_function_spec_data(coder.function_spec)))
@@ -418,14 +486,14 @@ class SdkWorker(object):
       reversed_ops.append(op)
       ops_by_id[transform.id] = op
 
-    return list(reversed(reversed_ops)), ops_by_id
+    return list(reversed(reversed_ops))
 
   def process_bundle(self, request, instruction_id):
-    ops, ops_by_id = self.create_execution_tree(
+    ops = self.create_execution_tree(
         self.fns[request.process_bundle_descriptor_reference])
 
     expected_inputs = []
-    for _, op in ops_by_id.items():
+    for op in ops:
       if isinstance(op, DataOutputOperation):
         # TODO(robertwb): Is there a better way to pass the instruction id to
         # the operation?
@@ -445,9 +513,7 @@ class SdkWorker(object):
       for data in input_op.data_channel.input_elements(
           instruction_id, [input_op.target]):
         # ignores input name
-        target_op = ops_by_id[data.target.primitive_transform_reference]
-        # lacks coder for non-input ops
-        target_op.process_encoded(data.data)
+        input_op.process_encoded(data.data)
 
     # Finish all operations.
     for op in ops:
@@ -455,3 +521,164 @@ class SdkWorker(object):
       op.finish()
 
     return beam_fn_api_pb2.ProcessBundleResponse()
+
+
+class BeamTransformFactory(object):
+  """Factory for turning transform_protos into executable operations."""
+  def __init__(self, descriptor, data_channel_factory, counter_factory,
+               state_sampler, state_handler):
+    self.descriptor = descriptor
+    self.data_channel_factory = data_channel_factory
+    self.counter_factory = counter_factory
+    self.state_sampler = state_sampler
+    self.state_handler = state_handler
+    self.context = pipeline_context.PipelineContext(descriptor)
+
+  _known_urns = {}
+
+  @classmethod
+  def register_urn(cls, urn, parameter_type):
+    def wrapper(func):
+      cls._known_urns[urn] = func, parameter_type
+      return func
+    return wrapper
+
+  def create_operation(self, transform_id, consumers):
+    transform_proto = self.descriptor.transforms[transform_id]
+    creator, parameter_type = self._known_urns[transform_proto.spec.urn]
+    parameter = proto_utils.unpack_Any(
+        transform_proto.spec.parameter, parameter_type)
+    return creator(self, transform_id, transform_proto, parameter, consumers)
+
+  def get_coder(self, coder_id):
+    return self.context.coders.get_by_id(coder_id)
+
+  def get_output_coders(self, transform_proto):
+    return {
+        tag: self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
+        for tag, pcoll_id in transform_proto.outputs.items()
+    }
+
+  def get_only_output_coder(self, transform_proto):
+    return only_element(self.get_output_coders(transform_proto).values())
+
+  def get_input_coders(self, transform_proto):
+    return {
+        tag: self.get_coder(self.descriptor.pcollections[pcoll_id].coder_id)
+        for tag, pcoll_id in transform_proto.inputs.items()
+    }
+
+  def get_only_input_coder(self, transform_proto):
+    return only_element(self.get_input_coders(transform_proto).values())
+
+  # TODO(robertwb): Update all operations to take these in the constructor.
+  @staticmethod
+  def augment_oldstyle_op(op, step_name, consumers, tag_list=None):
+    op.step_name = step_name
+    for tag, op_consumers in consumers.items():
+      for consumer in op_consumers:
+        op.add_receiver(consumer, tag_list.index(tag) if tag_list else 0)
+    return op
+
+
+@BeamTransformFactory.register_urn(
+    DATA_INPUT_URN, beam_fn_api_pb2.RemoteGrpcPort)
+def create(factory, transform_id, transform_proto, grpc_port, consumers):
+  target = beam_fn_api_pb2.Target(
+      primitive_transform_reference=transform_id,
+      name=only_element(transform_proto.outputs.keys()))
+  return DataInputOperation(
+      transform_proto.unique_name,
+      transform_proto.unique_name,
+      consumers,
+      factory.counter_factory,
+      factory.state_sampler,
+      factory.get_only_output_coder(transform_proto),
+      input_target=target,
+      data_channel=factory.data_channel_factory.create_data_channel(grpc_port))
+
+
+@BeamTransformFactory.register_urn(
+    DATA_OUTPUT_URN, beam_fn_api_pb2.RemoteGrpcPort)
+def create(factory, transform_id, transform_proto, grpc_port, consumers):
+  target = beam_fn_api_pb2.Target(
+      primitive_transform_reference=transform_id,
+      name='out')
+  return DataOutputOperation(
+      transform_proto.unique_name,
+      transform_proto.unique_name,
+      consumers,
+      factory.counter_factory,
+      factory.state_sampler,
+      # TODO(robertwb): Perhaps this could be distinct from the input coder?
+      factory.get_only_input_coder(transform_proto),
+      target=target,
+      data_channel=factory.data_channel_factory.create_data_channel(grpc_port))
+
+
+@BeamTransformFactory.register_urn(PYTHON_SOURCE_URN, wrappers_pb2.BytesValue)
+def create(factory, transform_id, transform_proto, parameter, consumers):
+  source = pickler.loads(parameter.value)
+  spec = operation_specs.WorkerRead(
+      iobase.SourceBundle(1.0, source, None, None),
+      [WindowedValueCoder(source.default_output_coder())])
+  return factory.augment_oldstyle_op(
+      operations.ReadOperation(
+          transform_proto.unique_name,
+          spec,
+          factory.counter_factory,
+          factory.state_sampler),
+      transform_proto.unique_name,
+      consumers)
+
+
+@BeamTransformFactory.register_urn(PYTHON_DOFN_URN, wrappers_pb2.BytesValue)
+def create(factory, transform_id, transform_proto, parameter, consumers):
+  dofn_data = pickler.loads(parameter.value)
+  if len(dofn_data) == 2:
+    # Has side input data.
+    serialized_fn, side_input_data = dofn_data
+  else:
+    # No side input data.
+    serialized_fn, side_input_data = parameter.value, []
+
+  def create_side_input(tag, coder):
+    # TODO(robertwb): Extract windows (and keys) out of element data.
+    # TODO(robertwb): Extract state key from ParDoPayload.
+    return operation_specs.WorkerSideInputSource(
+        tag=tag,
+        source=SideInputSource(
+            factory.state_handler,
+            beam_fn_api_pb2.StateKey.MultimapSideInput(
+                key=side_input_tag(transform_id, tag)),
+            coder=coder))
+  output_tags = list(transform_proto.outputs.keys())
+  output_coders = factory.get_output_coders(transform_proto)
+  spec = operation_specs.WorkerDoFn(
+      serialized_fn=serialized_fn,
+      output_tags=output_tags,
+      input=None,
+      side_inputs=[
+          create_side_input(tag, coder) for tag, coder in side_input_data],
+      output_coders=[output_coders[tag] for tag in output_tags])
+  return factory.augment_oldstyle_op(
+      operations.DoOperation(
+          transform_proto.unique_name,
+          spec,
+          factory.counter_factory,
+          factory.state_sampler),
+      transform_proto.unique_name,
+      consumers,
+      output_tags)
+
+
+@BeamTransformFactory.register_urn(IDENTITY_DOFN_URN, None)
+def create(factory, transform_id, transform_proto, unused_parameter, consumers):
+  return factory.augment_oldstyle_op(
+      operations.FlattenOperation(
+          transform_proto.unique_name,
+          None,
+          factory.counter_factory,
+          factory.state_sampler),
+      transform_proto.unique_name,
+      consumers)


Mime
View raw message