beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [2/2] beam git commit: Implement FnApi side inputs in Python.
Date Sun, 22 Oct 2017 07:40:37 GMT
Implement FnApi side inputs in Python.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38556b78
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38556b78
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38556b78

Branch: refs/heads/master
Commit: 38556b78bfa874620ccb61a5179a44c7a51dbf55
Parents: a71f000
Author: Robert Bradshaw <robertwb@gmail.com>
Authored: Thu Oct 19 12:11:44 2017 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Sun Oct 22 00:37:19 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py             |  30 ++--
 sdks/python/apache_beam/pvalue.py               | 103 +++++++++++++-
 .../runners/direct/transform_evaluator.py       |   9 +-
 .../runners/portability/fn_api_runner.py        | 142 +++++++++++++++----
 .../runners/portability/fn_api_runner_test.py   |  41 +++++-
 .../runners/worker/bundle_processor.py          | 116 +++++++++------
 .../apache_beam/runners/worker/data_plane.py    |  18 ++-
 .../apache_beam/runners/worker/operations.pxd   |   1 +
 .../apache_beam/runners/worker/operations.py    |  17 ++-
 .../apache_beam/runners/worker/sdk_worker.py    | 111 ++++++++++++++-
 sdks/python/apache_beam/transforms/core.py      |  23 ++-
 sdks/python/apache_beam/utils/urns.py           |   5 +
 12 files changed, 511 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index c670978..62626a3 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -506,9 +506,6 @@ class Pipeline(object):
         self.visit_transform(transform_node)
 
       def visit_transform(self, transform_node):
-        if transform_node.side_inputs:
-          # No side inputs (yet).
-          Visitor.ok = False
         try:
           # Transforms must be picklable.
           pickler.loads(pickler.dumps(transform_node.transform,
@@ -730,8 +727,12 @@ class AppliedPTransform(object):
 
   def named_inputs(self):
     # TODO(BEAM-1833): Push names up into the sdk construction.
-    return {str(ix): input for ix, input in enumerate(self.inputs)
-            if isinstance(input, pvalue.PCollection)}
+    main_inputs = {str(ix): input
+                   for ix, input in enumerate(self.inputs)
+                   if isinstance(input, pvalue.PCollection)}
+    side_inputs = {'side%s' % ix: si.pvalue
+                   for ix, si in enumerate(self.side_inputs)}
+    return dict(main_inputs, **side_inputs)
 
   def named_outputs(self):
     return {str(tag): output for tag, output in self.outputs.items()
@@ -750,7 +751,6 @@ class AppliedPTransform(object):
         spec=transform_to_runner_api(self.transform, context),
         subtransforms=[context.transforms.get_id(part, label=part.full_label)
                        for part in self.parts],
-        # TODO(BEAM-115): Side inputs.
         inputs={tag: context.pcollections.get_id(pc)
                 for tag, pc in self.named_inputs().items()},
         outputs={str(tag): context.pcollections.get_id(out)
@@ -760,12 +760,26 @@ class AppliedPTransform(object):
 
   @staticmethod
   def from_runner_api(proto, context):
+    def is_side_input(tag):
+      # As per named_inputs() above.
+      return tag.startswith('side')
+    main_inputs = [context.pcollections.get_by_id(id)
+                   for tag, id in proto.inputs.items()
+                   if not is_side_input(tag)]
+    # Ordering is important here.
+    indexed_side_inputs = [(int(tag[4:]), context.pcollections.get_by_id(id))
+                           for tag, id in proto.inputs.items()
+                           if is_side_input(tag)]
+    side_inputs = [si for _, si in sorted(indexed_side_inputs)]
     result = AppliedPTransform(
         parent=None,
         transform=ptransform.PTransform.from_runner_api(proto.spec, context),
         full_label=proto.unique_name,
-        inputs=[
-            context.pcollections.get_by_id(id) for id in proto.inputs.values()])
+        inputs=main_inputs)
+    if result.transform and result.transform.side_inputs:
+      for si, pcoll in zip(result.transform.side_inputs, side_inputs):
+        si.pvalue = pcoll
+      result.side_inputs = tuple(result.transform.side_inputs)
     result.parts = [
         context.transforms.get_by_id(id) for id in proto.subtransforms]
     result.outputs = {

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py
index d2d3653..31922f3 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -28,7 +28,11 @@ from __future__ import absolute_import
 
 import itertools
 
+from apache_beam import coders
 from apache_beam import typehints
+from apache_beam.internal import pickler
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.utils import urns
 
 __all__ = [
     'PCollection',
@@ -127,8 +131,6 @@ class PCollection(PValue):
     return _InvalidUnpickledPCollection, ()
 
   def to_runner_api(self, context):
-    from apache_beam.portability.api import beam_runner_api_pb2
-    from apache_beam.internal import pickler
     return beam_runner_api_pb2.PCollection(
         unique_name='%d%s.%s' % (
             len(self.producer.full_label), self.producer.full_label, self.tag),
@@ -139,7 +141,6 @@ class PCollection(PValue):
 
   @staticmethod
   def from_runner_api(proto, context):
-    from apache_beam.internal import pickler
     # Producer and tag will be filled in later, the key point is that the
     # same object is returned for the same pcollection id.
     return PCollection(None, element_type=pickler.loads(proto.coder_id))
@@ -288,6 +289,81 @@ class AsSideInput(object):
   def element_type(self):
     return typehints.Any
 
+  # TODO(robertwb): Get rid of _from_runtime_iterable and _view_options
+  # in favor of _side_input_data().
+  def _side_input_data(self):
+    view_options = self._view_options()
+    from_runtime_iterable = type(self)._from_runtime_iterable
+    return SideInputData(
+        urns.ITERABLE_ACCESS,
+        self._window_mapping_fn,
+        lambda iterable: from_runtime_iterable(iterable, view_options),
+        self._input_element_coder())
+
+  def _input_element_coder(self):
+    return coders.WindowedValueCoder(
+        coders.registry.get_coder(self.pvalue.element_type),
+        window_coder=self.pvalue.windowing.windowfn.get_window_coder())
+
+  def to_runner_api(self, context):
+    return self._side_input_data().to_runner_api(context)
+
+  @staticmethod
+  def from_runner_api(proto, context):
+    return _UnpickledSideInput(
+        SideInputData.from_runner_api(proto, context))
+
+
+class _UnpickledSideInput(AsSideInput):
+  def __init__(self, side_input_data):
+    self._data = side_input_data
+    self._window_mapping_fn = side_input_data.window_mapping_fn
+
+  @staticmethod
+  def _from_runtime_iterable(it, options):
+    return options['data'].view_fn(it)
+
+  def _view_options(self):
+    return {
+        'data': self._data,
+        # For non-fn-api runners.
+        'window_mapping_fn': self._data.window_mapping_fn,
+    }
+
+  def _side_input_data(self):
+    return self._data
+
+
+class SideInputData(object):
+  """All of the data about a side input except for the bound PCollection."""
+  def __init__(self, access_pattern, window_mapping_fn, view_fn, coder):
+    self.access_pattern = access_pattern
+    self.window_mapping_fn = window_mapping_fn
+    self.view_fn = view_fn
+    self.coder = coder
+
+  def to_runner_api(self, unused_context):
+    return beam_runner_api_pb2.SideInput(
+        access_pattern=beam_runner_api_pb2.FunctionSpec(
+            urn=self.access_pattern),
+        view_fn=beam_runner_api_pb2.SdkFunctionSpec(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=urns.PICKLED_PYTHON_VIEWFN,
+                payload=pickler.dumps((self.view_fn, self.coder)))),
+        window_mapping_fn=beam_runner_api_pb2.SdkFunctionSpec(
+            spec=beam_runner_api_pb2.FunctionSpec(
+                urn=urns.PICKLED_WINDOW_MAPPING_FN,
+                payload=pickler.dumps(self.window_mapping_fn))))
+
+  @staticmethod
+  def from_runner_api(proto, unused_context):
+    assert proto.view_fn.spec.urn == urns.PICKLED_PYTHON_VIEWFN
+    assert proto.window_mapping_fn.spec.urn == urns.PICKLED_WINDOW_MAPPING_FN
+    return SideInputData(
+        proto.access_pattern.urn,
+        pickler.loads(proto.window_mapping_fn.spec.payload),
+        *pickler.loads(proto.view_fn.spec.payload))
+
 
 class AsSingleton(AsSideInput):
   """Marker specifying that an entire PCollection is to be used as a side input.
@@ -358,6 +434,13 @@ class AsIter(AsSideInput):
   def _from_runtime_iterable(it, options):
     return it
 
+  def _side_input_data(self):
+    return SideInputData(
+        urns.ITERABLE_ACCESS,
+        self._window_mapping_fn,
+        lambda iterable: iterable,
+        self._input_element_coder())
+
   @property
   def element_type(self):
     return typehints.Iterable[self.pvalue.element_type]
@@ -382,6 +465,13 @@ class AsList(AsSideInput):
   def _from_runtime_iterable(it, options):
     return list(it)
 
+  def _side_input_data(self):
+    return SideInputData(
+        urns.ITERABLE_ACCESS,
+        self._window_mapping_fn,
+        list,
+        self._input_element_coder())
+
 
 class AsDict(AsSideInput):
   """Marker specifying a PCollection to be used as an indexable side input.
@@ -403,6 +493,13 @@ class AsDict(AsSideInput):
   def _from_runtime_iterable(it, options):
     return dict(it)
 
+  def _side_input_data(self):
+    return SideInputData(
+        urns.ITERABLE_ACCESS,
+        self._window_mapping_fn,
+        dict,
+        self._input_element_coder())
+
 
 class EmptySideInput(object):
   """Value indicating when a singleton side input was empty.

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 16a2991..2f3ac4f 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -584,10 +584,13 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
     assert len(self._outputs) == 1
     self.output_pcollection = list(self._outputs)[0]
 
-    # The input type of a GroupByKey will be KV[Any, Any] or more specific.
+    # The output type of a GroupByKey will be KV[Any, Any] or more specific.
+    # TODO(BEAM-2717): Infer coders earlier.
     kv_type_hint = (
-        self._applied_ptransform.transform.get_type_hints().input_types[0])
-    self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
+        self._applied_ptransform.outputs[None].element_type
+        or
+        self._applied_ptransform.transform.get_type_hints().input_types[0][0])
+    self.key_coder = coders.registry.get_coder(kv_type_hint.tuple_types[0])
 
   def process_timer(self, timer_firing):
     # We do not need to emit a KeyedWorkItem to process_element().

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/runners/portability/fn_api_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 463f78f..838ce1e 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -65,7 +65,8 @@ def streaming_rpc_handler(cls, method_name):
       self._push_queue = queue.Queue()
       self._pull_queue = queue.Queue()
       setattr(self, method_name, self.run)
-      self._read_thread = threading.Thread(target=self._read)
+      self._read_thread = threading.Thread(
+          name='streaming_rpc_handler_read', target=self._read)
       self._started = False
 
     def run(self, iterator, context):
@@ -155,6 +156,35 @@ class _GroupingBuffer(object):
     return iter([output_stream.get()])
 
 
+class _WindowGroupingBuffer(object):
+  """Used to partition windowed side inputs."""
+  def __init__(self, side_input_data):
+    # Here's where we would use a different type of partitioning
+    # (e.g. also by key) for a different access pattern.
+    assert side_input_data.access_pattern == urns.ITERABLE_ACCESS
+    self._windowed_value_coder = side_input_data.coder
+    self._window_coder = side_input_data.coder.window_coder
+    self._value_coder = side_input_data.coder.wrapped_value_coder
+    self._values_by_window = collections.defaultdict(list)
+
+  def append(self, elements_data):
+    input_stream = create_InputStream(elements_data)
+    while input_stream.size() > 0:
+      windowed_value = self._windowed_value_coder.get_impl(
+          ).decode_from_stream(input_stream, True)
+      for window in windowed_value.windows:
+        self._values_by_window[window].append(windowed_value.value)
+
+  def items(self):
+    value_coder_impl = self._value_coder.get_impl()
+    for window, values in self._values_by_window.items():
+      encoded_window = self._window_coder.encode(window)
+      output_stream = create_OutputStream()
+      for value in values:
+        value_coder_impl.encode_to_stream(value, output_stream, True)
+      yield encoded_window, output_stream.get()
+
+
 class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
   def __init__(self, use_grpc=False, sdk_harness_factory=None):
@@ -206,11 +236,14 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
       def __repr__(self):
         must_follow = ', '.join(prev.name for prev in self.must_follow)
-        return "%s\n    %s\n    must follow: %s" % (
+        downstream_side_inputs = ', '.join(
+            str(si) for si in self.downstream_side_inputs)
+        return "%s\n  %s\n  must follow: %s\n  downstream_side_inputs: %s" % (
             self.name,
             '\n'.join(["%s:%s" % (transform.unique_name, transform.spec.urn)
                        for transform in self.transforms]),
-            must_follow)
+            must_follow,
+            downstream_side_inputs)
 
       def can_fuse(self, consumer):
         def no_overlap(a, b):
@@ -469,11 +502,12 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
           for transform in stage.transforms:
             for output in transform.outputs.values():
               if output in all_side_inputs:
-                downstream_side_inputs = union(downstream_side_inputs, output)
-                for consumer in consumers[output]:
-                  downstream_side_inputs = union(
-                      downstream_side_inputs,
-                      compute_downstream_side_inputs(consumer))
+                downstream_side_inputs = union(
+                    downstream_side_inputs, frozenset([output]))
+              for consumer in consumers[output]:
+                downstream_side_inputs = union(
+                    downstream_side_inputs,
+                    compute_downstream_side_inputs(consumer))
           downstream_side_inputs_by_stage[stage] = downstream_side_inputs
         return downstream_side_inputs_by_stage[stage]
 
@@ -524,7 +558,7 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
           producer = replacement(producer)
           consumer = replacement(consumer)
           # Update consumer.must_follow set, as it's used in can_fuse.
-          consumer.must_follow = set(
+          consumer.must_follow = frozenset(
               replacement(s) for s in consumer.must_follow)
           if producer.can_fuse(consumer):
             fuse(producer, consumer)
@@ -549,8 +583,11 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
                       spec=beam_runner_api_pb2.FunctionSpec(
                           urn=bundle_processor.DATA_INPUT_URN,
                           payload=pcoll_as_param))],
-                  must_follow={write_pcoll})
+                  must_follow=frozenset([write_pcoll]))
               fuse(read_pcoll, consumer)
+            else:
+              consumer.must_follow = union(
+                  consumer.must_follow, frozenset([write_pcoll]))
 
       # Everything that was originally a stage or a replacement, but wasn't
       # replaced, should be in the final graph.
@@ -658,9 +695,9 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       data_side_input = {}
       data_output = {}
       for transform in stage.transforms:
-        pcoll_id = transform.spec.payload
         if transform.spec.urn in (bundle_processor.DATA_INPUT_URN,
                                   bundle_processor.DATA_OUTPUT_URN):
+          pcoll_id = transform.spec.payload
           if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
             target = transform.unique_name, only_element(transform.outputs)
             data_input[target] = pcoll_id
@@ -673,13 +710,18 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
             transform.spec.payload = data_operation_spec.SerializeToString()
           else:
             transform.spec.payload = ""
+        elif transform.spec.urn == urns.PARDO_TRANSFORM:
+          payload = proto_utils.parse_Bytes(
+              transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
+          for tag, si in payload.side_inputs.items():
+            data_side_input[transform.unique_name, tag] = (
+                'materialize:' + transform.inputs[tag],
+                beam.pvalue.SideInputData.from_runner_api(si, None))
       return data_input, data_side_input, data_output
 
     logging.info('Running %s', stage.name)
     logging.debug('       %s', stage)
     data_input, data_side_input, data_output = extract_endpoints(stage)
-    if data_side_input:
-      raise NotImplementedError('Side inputs.')
 
     process_bundle_descriptor = beam_fn_api_pb2.ProcessBundleDescriptor(
         id=self._next_uid(),
@@ -711,6 +753,20 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
         data_out.write(element_data)
       data_out.close()
 
+    # Store the required side inputs into state.
+    for (transform_id, tag), (pcoll_id, si) in data_side_input.items():
+      elements_by_window = _WindowGroupingBuffer(si)
+      for element_data in pcoll_buffers[pcoll_id]:
+        elements_by_window.append(element_data)
+      for window, elements_data in elements_by_window.items():
+        state_key = beam_fn_api_pb2.StateKey(
+            multimap_side_input=beam_fn_api_pb2.StateKey.MultimapSideInput(
+                ptransform_id=transform_id,
+                side_input_id=tag,
+                window=window))
+        controller.state_handler.blocking_append(
+            state_key, elements_data, process_bundle.instruction_id)
+
     # Register and start running the bundle.
     controller.control_handler.push(process_bundle_registration)
     controller.control_handler.push(process_bundle)
@@ -975,34 +1031,56 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
   # These classes are used to interact with the worker.
 
-  class SimpleState(object):  # TODO(robertwb): Inherit from GRPC servicer.
+  class StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer):
 
     def __init__(self):
-      self._all = collections.defaultdict(list)
+      self._lock = threading.Lock()
+      self._state = collections.defaultdict(list)
 
-    def Get(self, state_key):
-      return beam_fn_api_pb2.Elements.Data(
-          data=''.join(self._all[self._to_key(state_key)]))
+    def blocking_get(self, state_key, instruction_reference=None):
+      with self._lock:
+        return ''.join(self._state[self._to_key(state_key)])
 
-    def Append(self, state_key, data):
-      self._all[self._to_key(state_key)].extend(data)
+    def blocking_append(self, state_key, data, instruction_reference=None):
+      with self._lock:
+        self._state[self._to_key(state_key)].append(data)
 
-    def Clear(self, state_key):
-      try:
-        del self._all[self._to_key(state_key)]
-      except KeyError:
-        pass
+    def blocking_clear(self, state_key, instruction_reference=None):
+      with self._lock:
+        del self._state[self._to_key(state_key)]
 
     @staticmethod
     def _to_key(state_key):
-      return state_key.window, state_key.key
+      return state_key.SerializeToString()
+
+  class GrpcStateServicer(
+      StateServicer, beam_fn_api_pb2_grpc.BeamFnStateServicer):
+    def State(self, request_stream, context=None):
+      # Note that this eagerly mutates state, assuming any failures are fatal.
+      # Thus it is safe to ignore instruction_reference.
+      for request in request_stream:
+        if request.get:
+          yield beam_fn_api_pb2.StateResponse(
+              id=request.id,
+              get=beam_fn_api_pb2.StateGetResponse(
+                  data=self.blocking_get(request.state_key)))
+        elif request.append:
+          self.blocking_append(request.state_key, request.append.data)
+          yield beam_fn_api_pb2.StateResponse(
+              id=request.id,
+              append=beam_fn_api_pb2.AppendResponse())
+        elif request.clear:
+          self.blocking_clear(request.state_key)
+          yield beam_fn_api_pb2.StateResponse(
+              id=request.id,
+              clear=beam_fn_api_pb2.ClearResponse())
 
   class DirectController(object):
     """An in-memory controller for fn API control, state and data planes."""
 
     def __init__(self):
       self._responses = []
-      self.state_handler = FnApiRunner.SimpleState()
+      self.state_handler = FnApiRunner.StateServicer()
       self.control_handler = self
       self.data_plane_handler = data_plane.InMemoryDataChannel()
       self.worker = sdk_worker.SdkWorker(
@@ -1032,7 +1110,6 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
     def __init__(self, sdk_harness_factory=None):
       self.sdk_harness_factory = sdk_harness_factory
-      self.state_handler = FnApiRunner.SimpleState()
       self.control_server = grpc.server(
           futures.ThreadPoolExecutor(max_workers=10))
       self.control_port = self.control_server.add_insecure_port('[::]:0')
@@ -1049,6 +1126,12 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
       beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server(
           self.data_plane_handler, self.data_server)
 
+      # TODO(robertwb): Is sharing the control channel fine?  Alternatively,
+      # how should this be plumbed?
+      self.state_handler = FnApiRunner.GrpcStateServicer()
+      beam_fn_api_pb2_grpc.add_BeamFnStateServicer_to_server(
+          self.state_handler, self.control_server)
+
       logging.info('starting control server on port %s', self.control_port)
       logging.info('starting data server on port %s', self.data_port)
       self.data_server.start()
@@ -1056,7 +1139,8 @@ class FnApiRunner(maptask_executor_runner.MapTaskExecutorRunner):
 
       self.worker = (self.sdk_harness_factory or sdk_worker.SdkHarness)(
           'localhost:%s' % self.control_port)
-      self.worker_thread = threading.Thread(target=self.worker.run)
+      self.worker_thread = threading.Thread(
+          name='run_worker', target=self.worker.run)
       logging.info('starting worker')
       self.worker_thread.start()
 

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 31f1b6f..ea9ed1a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -24,6 +24,7 @@ from apache_beam.runners.portability import fn_api_runner
 from apache_beam.runners.portability import maptask_executor_runner_test
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.transforms import window
 
 try:
   from apache_beam.runners.worker.statesampler import DEFAULT_SAMPLING_PERIOD_MS
@@ -47,12 +48,40 @@ class FnApiRunnerTest(
     pass
 
   def test_pardo_side_inputs(self):
-    # TODO(BEAM-1348): Enable once side inputs are supported in fn API.
-    pass
-
-  def test_pardo_unfusable_side_inputs(self):
-    # TODO(BEAM-1348): Enable once side inputs are supported in fn API.
-    pass
+    def cross_product(elem, sides):
+      for side in sides:
+        yield elem, side
+    with self.create_pipeline() as p:
+      main = p | 'main' >> beam.Create(['a', 'b', 'c'])
+      side = p | 'side' >> beam.Create(['x', 'y'])
+      assert_that(main | beam.FlatMap(cross_product, beam.pvalue.AsList(side)),
+                  equal_to([('a', 'x'), ('b', 'x'), ('c', 'x'),
+                            ('a', 'y'), ('b', 'y'), ('c', 'y')]))
+
+      # Now with some windowing.
+      pcoll = p | beam.Create(range(10)) | beam.Map(
+          lambda t: window.TimestampedValue(t, t))
+      # Intentionally choosing non-aligned windows to highlight the transition.
+      main = pcoll | 'WindowMain' >> beam.WindowInto(window.FixedWindows(5))
+      side = pcoll | 'WindowSide' >> beam.WindowInto(window.FixedWindows(7))
+      res = main | beam.Map(lambda x, s: (x, sorted(s)),
+                            beam.pvalue.AsList(side))
+      assert_that(
+          res,
+          equal_to([
+              # The window [0, 5) maps to the window [0, 7).
+              (0, range(7)),
+              (1, range(7)),
+              (2, range(7)),
+              (3, range(7)),
+              (4, range(7)),
+              # The window [5, 10) maps to the window [7, 14).
+              (5, range(7, 10)),
+              (6, range(7, 10)),
+              (7, range(7, 10)),
+              (8, range(7, 10)),
+              (9, range(7, 10))]),
+          label='windowed')
 
   def test_assert_that(self):
     # TODO: figure out a way for fn_api_runner to parse and raise the

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/runners/worker/bundle_processor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 1049ae1..689eab7 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -39,6 +39,7 @@ from apache_beam.runners import pipeline_context
 from apache_beam.runners.dataflow.native_io import iobase as native_iobase
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import operations
+from apache_beam.transforms import sideinputs
 from apache_beam.utils import counters
 from apache_beam.utils import proto_utils
 from apache_beam.utils import urns
@@ -62,10 +63,6 @@ OLD_DATAFLOW_RUNNER_HARNESS_PARDO_URN = 'urn:beam:dofn:javasdk:0.1'
 OLD_DATAFLOW_RUNNER_HARNESS_READ_URN = 'urn:org.apache.beam:source:java:0.1'
 
 
-def side_input_tag(transform_id, tag):
-  return str("%d[%s][%s]" % (len(transform_id), transform_id, tag))
-
-
 class RunnerIOOperation(operations.Operation):
   """Common baseclass for runner harness IO operations."""
 
@@ -162,6 +159,44 @@ class SideInputSource(native_iobase.NativeSource,
       yield self._coder.get_impl().decode_from_stream(input_stream, True)
 
 
+class StateBackedSideInputMap(object):
+  def __init__(self, state_handler, transform_id, tag, side_input_data):
+    self._state_handler = state_handler
+    self._transform_id = transform_id
+    self._tag = tag
+    self._side_input_data = side_input_data
+    self._element_coder = side_input_data.coder.wrapped_value_coder
+    self._target_window_coder = side_input_data.coder.window_coder
+    # TODO(robertwb): Limit the cache size.
+    # TODO(robertwb): Cross-bundle caching respecting cache tokens.
+    self._cache = {}
+
+  def __getitem__(self, window):
+    target_window = self._side_input_data.window_mapping_fn(window)
+    if target_window not in self._cache:
+      state_key = beam_fn_api_pb2.StateKey(
+          multimap_side_input=beam_fn_api_pb2.StateKey.MultimapSideInput(
+              ptransform_id=self._transform_id,
+              side_input_id=self._tag,
+              window=self._target_window_coder.encode(target_window)))
+      element_coder_impl = self._element_coder.get_impl()
+      state_handler = self._state_handler
+
+      class AllElements(object):
+        def __iter__(self):
+          # TODO(robertwb): Support pagination.
+          input_stream = coder_impl.create_InputStream(
+              state_handler.blocking_get(state_key, None))
+          while input_stream.size() > 0:
+            yield element_coder_impl.decode_from_stream(input_stream, True)
+      self._cache[target_window] = self._side_input_data.view_fn(AllElements())
+    return self._cache[target_window]
+
+  def is_globally_windowed(self):
+    return (self._side_input_data.window_mapping_fn
+            == sideinputs._global_window_mapping_fn)
+
+
 def memoize(func):
   cache = {}
   missing = object()
@@ -201,10 +236,17 @@ class BundleProcessor(object):
         descriptor, self.data_channel_factory, self.counter_factory,
         self.state_sampler, self.state_handler)
 
+    def is_side_input(transform_proto, tag):
+      if transform_proto.spec.urn == urns.PARDO_TRANSFORM:
+        return tag in proto_utils.parse_Bytes(
+            transform_proto.spec.payload,
+            beam_runner_api_pb2.ParDoPayload).side_inputs
+
     pcoll_consumers = collections.defaultdict(list)
     for transform_id, transform_proto in descriptor.transforms.items():
-      for pcoll_id in transform_proto.inputs.values():
-        pcoll_consumers[pcoll_id].append(transform_id)
+      for tag, pcoll_id in transform_proto.inputs.items():
+        if not is_side_input(transform_proto, tag):
+          pcoll_consumers[pcoll_id].append(transform_id)
 
     @memoize
     def get_operation(transform_id):
@@ -412,7 +454,6 @@ def create(factory, transform_id, transform_proto, parameter, consumers):
 @BeamTransformFactory.register_urn(
     urns.READ_TRANSFORM, beam_runner_api_pb2.ReadPayload)
 def create(factory, transform_id, transform_proto, parameter, consumers):
-  # The Dataflow runner harness strips the base64 encoding.
   source = iobase.SourceBase.from_runner_api(parameter.source, factory.context)
   spec = operation_specs.WorkerRead(
       iobase.SourceBundle(1.0, source, None, None),
@@ -428,17 +469,9 @@ def create(factory, transform_id, transform_proto, parameter, consumers):
 
 
 @BeamTransformFactory.register_urn(OLD_DATAFLOW_RUNNER_HARNESS_PARDO_URN, None)
-def create(factory, transform_id, transform_proto, parameter, consumers):
-  dofn_data = pickler.loads(parameter)
-  if len(dofn_data) == 2:
-    # Has side input data.
-    serialized_fn, side_input_data = dofn_data
-  else:
-    # No side input data.
-    serialized_fn, side_input_data = parameter, []
+def create(factory, transform_id, transform_proto, serialized_fn, consumers):
   return _create_pardo_operation(
-      factory, transform_id, transform_proto, consumers,
-      serialized_fn, side_input_data)
+      factory, transform_id, transform_proto, consumers, serialized_fn)
 
 
 @BeamTransformFactory.register_urn(
@@ -446,31 +479,26 @@ def create(factory, transform_id, transform_proto, parameter, consumers):
 def create(factory, transform_id, transform_proto, parameter, consumers):
   assert parameter.do_fn.spec.urn == urns.PICKLED_DO_FN_INFO
   serialized_fn = parameter.do_fn.spec.payload
-  dofn_data = pickler.loads(serialized_fn)
-  if len(dofn_data) == 2:
-    # Has side input data.
-    serialized_fn, side_input_data = dofn_data
-  else:
-    # No side input data.
-    side_input_data = []
   return _create_pardo_operation(
       factory, transform_id, transform_proto, consumers,
-      serialized_fn, side_input_data)
+      serialized_fn, parameter.side_inputs)
 
 
 def _create_pardo_operation(
     factory, transform_id, transform_proto, consumers,
-    serialized_fn, side_input_data):
-  def create_side_input(tag, coder):
-    # TODO(robertwb): Extract windows (and keys) out of element data.
-    # TODO(robertwb): Extract state key from ParDoPayload.
-    return operation_specs.WorkerSideInputSource(
-        tag=tag,
-        source=SideInputSource(
-            factory.state_handler,
-            beam_fn_api_pb2.StateKey.MultimapSideInput(
-                key=side_input_tag(transform_id, tag)),
-            coder=coder))
+    serialized_fn, side_inputs_proto=None):
+
+  if side_inputs_proto:
+    tagged_side_inputs = [
+        (tag, beam.pvalue.SideInputData.from_runner_api(si, factory.context))
+        for tag, si in side_inputs_proto.items()]
+    tagged_side_inputs.sort(key=lambda tag_si: int(tag_si[0][4:]))
+    side_input_maps = [
+        StateBackedSideInputMap(factory.state_handler, transform_id, tag, si)
+        for tag, si in tagged_side_inputs]
+  else:
+    side_input_maps = []
+
   output_tags = list(transform_proto.outputs.keys())
 
   # Hack to match out prefix injected by dataflow runner.
@@ -482,27 +510,31 @@ def _create_pardo_operation(
         return 'out_' + tag
     else:
       return tag
+
   dofn_data = pickler.loads(serialized_fn)
   if not dofn_data[-1]:
     # Windowing not set.
-    pcoll_id, = transform_proto.inputs.values()
+    side_input_tags = side_inputs_proto or ()
+    pcoll_id, = [pcoll for tag, pcoll in transform_proto.inputs.items()
+                 if tag not in side_input_tags]
     windowing = factory.context.windowing_strategies.get_by_id(
         factory.descriptor.pcollections[pcoll_id].windowing_strategy_id)
     serialized_fn = pickler.dumps(dofn_data[:-1] + (windowing,))
+
   output_coders = factory.get_output_coders(transform_proto)
   spec = operation_specs.WorkerDoFn(
       serialized_fn=serialized_fn,
       output_tags=[mutate_tag(tag) for tag in output_tags],
       input=None,
-      side_inputs=[
-          create_side_input(tag, coder) for tag, coder in side_input_data],
+      side_inputs=[],  # Obsoleted by side_input_maps.
       output_coders=[output_coders[tag] for tag in output_tags])
   return factory.augment_oldstyle_op(
       operations.DoOperation(
           transform_proto.unique_name,
           spec,
           factory.counter_factory,
-          factory.state_sampler),
+          factory.state_sampler,
+          side_input_maps),
       transform_proto.unique_name,
       consumers,
       output_tags)
@@ -511,10 +543,8 @@ def _create_pardo_operation(
 def _create_simple_pardo_operation(
     factory, transform_id, transform_proto, consumers, dofn):
   serialized_fn = pickler.dumps((dofn, (), {}, [], None))
-  side_input_data = []
   return _create_pardo_operation(
-      factory, transform_id, transform_proto, consumers,
-      serialized_fn, side_input_data)
+      factory, transform_id, transform_proto, consumers, serialized_fn)
 
 
 @BeamTransformFactory.register_urn(

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/runners/worker/data_plane.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py
index 5a511a0..f2a3751 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -25,6 +25,7 @@ import abc
 import collections
 import logging
 import Queue as queue
+import sys
 import threading
 
 import grpc
@@ -147,6 +148,7 @@ class _GrpcDataChannel(DataChannel):
     self._receive_lock = threading.Lock()
     self._reads_finished = threading.Event()
     self._closed = False
+    self._exc_info = None
 
   def close(self):
     self._to_send.put(self._WRITES_FINISHED)
@@ -163,12 +165,17 @@ class _GrpcDataChannel(DataChannel):
     received = self._receiving_queue(instruction_id)
     done_targets = []
     while len(done_targets) < len(expected_targets):
-      data = received.get()
-      if not data.data and data.target in expected_targets:
-        done_targets.append(data.target)
+      try:
+        data = received.get(timeout=1)
+      except queue.Empty:
+        if self._exc_info:
+          raise exc_info[0], exc_info[1], exc_info[2]
       else:
-        assert data.target not in done_targets
-        yield data
+        if not data.data and data.target in expected_targets:
+          done_targets.append(data.target)
+        else:
+          assert data.target not in done_targets
+          yield data
 
   def output_stream(self, instruction_id, target):
     # TODO: Return an output stream that sends data
@@ -215,6 +222,7 @@ class _GrpcDataChannel(DataChannel):
     except:  # pylint: disable=bare-except
       if not self._closed:
         logging.exception('Failed to read inputs in the data plane')
+        self._exc_info = sys.exc_info()
         raise
     finally:
       self._reads_finished.set()

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/runners/worker/operations.pxd
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index d380a45..cb05c90 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -72,6 +72,7 @@ cdef class DoOperation(Operation):
   cdef object dofn_runner
   cdef Receiver dofn_receiver
   cdef object tagged_receivers
+  cdef object side_input_maps
 
 cdef class CombineOperation(Operation):
   cdef object phased_combine_fn

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/runners/worker/operations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index ed3f3b8..6b5f024 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -259,6 +259,11 @@ class _TaggedReceivers(dict):
 class DoOperation(Operation):
   """A Do operation that will execute a custom DoFn for each input element."""
 
+  def __init__(
+      self, name, spec, counter_factory, sampler, side_input_maps=None):
+    super(DoOperation, self).__init__(name, spec, counter_factory, sampler)
+    self.side_input_maps = side_input_maps
+
   def _read_side_inputs(self, tags_and_types):
     """Generator reading side inputs in the order prescribed by tags_and_types.
 
@@ -273,6 +278,10 @@ class DoOperation(Operation):
       either in singleton or collection mode according to the tags_and_types
       argument.
     """
+    # Only call this on the old path where side_input_maps was not
+    # provided directly.
+    assert self.side_input_maps is None
+
     # We will read the side inputs in the order prescribed by the
     # tags_and_types argument because this is exactly the order needed to
     # replace the ArgumentPlaceholder objects in the args/kwargs of the DoFn
@@ -336,8 +345,14 @@ class DoOperation(Operation):
           raise ValueError('Unexpected output name for operation: %s' % tag)
         self.tagged_receivers[original_tag] = self.receivers[index]
 
+      if self.side_input_maps is None:
+        if tags_and_types:
+          self.side_input_maps = list(self._read_side_inputs(tags_and_types))
+        else:
+          self.side_input_maps = []
+
       self.dofn_runner = common.DoFnRunner(
-          fn, args, kwargs, self._read_side_inputs(tags_and_types),
+          fn, args, kwargs, self.side_input_maps,
           window_fn, context, self.tagged_receivers,
           logger, self.step_name,
           scoped_metrics_container=self.scoped_metrics_container)

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/runners/worker/sdk_worker.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index b08e473..55ecbcc 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -24,6 +24,8 @@ from __future__ import print_function
 import functools
 import logging
 import Queue as queue
+import sys
+import threading
 import traceback
 from concurrent import futures
 
@@ -46,9 +48,10 @@ class SdkHarness(object):
 
   def run(self):
     control_stub = beam_fn_api_pb2_grpc.BeamFnControlStub(self._control_channel)
-    # TODO(robertwb): Wire up to new state api.
-    state_stub = None
-    self.worker = SdkWorker(state_stub, self._data_channel_factory)
+    state_stub = beam_fn_api_pb2_grpc.BeamFnStateStub(self._control_channel)
+    state_handler = GrpcStateHandler(state_stub)
+    state_handler.start()
+    self.worker = SdkWorker(state_handler, self._data_channel_factory)
 
     responses = queue.Queue()
     no_more_work = object()
@@ -102,6 +105,7 @@ class SdkHarness(object):
     # control to its caller.
     responses.put(no_more_work)
     self._data_channel_factory.close()
+    state_handler.done()
     logging.info('Done consuming work.')
 
 
@@ -148,3 +152,104 @@ class SdkWorker(object):
   def process_bundle_progress(self, request, instruction_id):
     # It is an error to get progress for a not-in-flight bundle.
     return self.bundle_processors.get(instruction_id).metrics()
+
+
+class GrpcStateHandler(object):
+
+  _DONE = object()
+
+  def __init__(self, state_stub):
+    self._lock = threading.Lock()
+    self._state_stub = state_stub
+    self._requests = queue.Queue()
+    self._responses_by_id = {}
+    self._last_id = 0
+    self._exc_info = None
+
+  def start(self):
+    self._done = False
+
+    def request_iter():
+      while True:
+        request = self._requests.get()
+        if request is self._DONE or self._done:
+          break
+        yield request
+    responses = self._state_stub.State(request_iter())
+
+    def pull_responses():
+      try:
+        for response in responses:
+          self._responses_by_id[response.id].set(response)
+          if self._done:
+            break
+      except:  # pylint: disable=bare-except
+        self._exc_info = sys.exc_info()
+        raise
+    reader = threading.Thread(target=pull_responses, name='read_state')
+    reader.daemon = True
+    reader.start()
+
+  def done(self):
+    self._done = True
+    self._requests.put(self._DONE)
+
+  def blocking_get(self, state_key, instruction_reference):
+    response = self._blocking_request(
+        beam_fn_api_pb2.StateRequest(
+            instruction_reference=instruction_reference,
+            state_key=state_key,
+            get=beam_fn_api_pb2.StateGetRequest()))
+    if response.get.continuation_token:
+      raise NotImplementedErrror
+    return response.get.data
+
+  def blocking_append(self, state_key, data, instruction_reference):
+    self._blocking_request(
+        beam_fn_api_pb2.StateRequest(
+            instruction_reference=instruction_reference,
+            state_key=state_key,
+            append=beam_fn_api_pb2.StateAppendRequest(data=data)))
+
+  def blocking_clear(self, state_key, instruction_reference):
+    self._blocking_request(
+        beam_fn_api_pb2.StateRequest(
+            instruction_reference=instruction_reference,
+            state_key=state_key,
+            clear=beam_fn_api_pb2.StateClearRequest()))
+
+  def _blocking_request(self, request):
+    request.id = self._next_id()
+    self._responses_by_id[request.id] = future = _Future()
+    self._requests.put(request)
+    while not future.wait(timeout=1):
+      if self._exc_info:
+        raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
+      elif self._done:
+        raise RuntimeError()
+    del self._responses_by_id[request.id]
+    return future.get()
+
+  def _next_id(self):
+    self._last_id += 1
+    return str(self._last_id)
+
+
+class _Future(object):
+  """A simple future object to implement blocking requests.
+  """
+  def __init__(self):
+    self._event = threading.Event()
+
+  def wait(self, timeout=None):
+    return self._event.wait(timeout)
+
+  def get(self, timeout=None):
+    if self.wait(timeout):
+      return self._value
+    else:
+      raise LookupError()
+
+  def set(self, value):
+    self._value = value
+    self._event.set()

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index ff2428e..fbc5f33 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -842,7 +842,7 @@ class ParDo(PTransformWithSideInputs):
     return _MultiParDo(self, tags, main_tag)
 
   def _pardo_fn_data(self):
-    si_tags_and_types = []
+    si_tags_and_types = None
     windowing = None
     return self.fn, self.args, self.kwargs, si_tags_and_types, windowing
 
@@ -855,7 +855,15 @@ class ParDo(PTransformWithSideInputs):
             do_fn=beam_runner_api_pb2.SdkFunctionSpec(
                 spec=beam_runner_api_pb2.FunctionSpec(
                     urn=urns.PICKLED_DO_FN_INFO,
-                    payload=picked_pardo_fn_data))))
+                    payload=picked_pardo_fn_data)),
+            # It'd be nice to name these according to their actual
+            # names/positions in the orignal argument list, but such a
+            # transformation is currently irreversible given how
+            # remove_objects_from_args and insert_values_in_args
+            # are currently implemented.
+            side_inputs={
+                "side%s" % ix: si.to_runner_api(context)
+                for ix, si in enumerate(self.side_inputs)}))
 
   @PTransform.register_urn(
       urns.PARDO_TRANSFORM, beam_runner_api_pb2.ParDoPayload)
@@ -864,10 +872,17 @@ class ParDo(PTransformWithSideInputs):
     fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
         pardo_payload.do_fn.spec.payload)
     if si_tags_and_types:
-      raise NotImplementedError('deferred side inputs')
+      raise NotImplementedError('explicit side input data')
     elif windowing:
       raise NotImplementedError('explicit windowing')
-    return ParDo(fn, *args, **kwargs)
+    result = ParDo(fn, *args, **kwargs)
+    # This is an ordered list stored as a dict (see the comments in
+    # to_runner_api_parameter above).
+    indexed_side_inputs = [
+        (int(ix[4:]), pvalue.AsSideInput.from_runner_api(si, context))
+        for ix, si in pardo_payload.side_inputs.items()]
+    result.side_inputs = [si for _, si in sorted(indexed_side_inputs)]
+    return result
 
 
 class _MultiParDo(PTransform):

http://git-wip-us.apache.org/repos/asf/beam/blob/38556b78/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 2aeaa53..c6135ba 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -58,6 +58,11 @@ LENGTH_PREFIX_CODER = "urn:beam:coders:length_prefix:0.1"
 GLOBAL_WINDOW_CODER = "urn:beam:coders:urn:beam:coders:global_window:0.1"
 WINDOWED_VALUE_CODER = "urn:beam:coders:windowed_value:0.1"
 
+ITERABLE_ACCESS = "urn:beam:sideinput:iterable"
+MULTIMAP_ACCESS = "urn:beam:sideinput:multimap"
+PICKLED_PYTHON_VIEWFN = "beam:view_fn:pickled_python_data:v0.1"
+PICKLED_WINDOW_MAPPING_FN = "beam:window_mapping_fn:pickled_python:v0.1"
+
 
 class RunnerApiFn(object):
   """Abstract base class that provides urn registration utilities.


Mime
View raw message