beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [03/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
Date Thu, 20 Jul 2017 17:09:30 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 62c09ed..e656600 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -69,34 +69,20 @@ from apache_beam.utils import processes
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.options.pipeline_options import SetupOptions
 
-# All constants are for internal use only; no backwards-compatibility
-# guarantees.
 
-# In a released version BEAM_CONTAINER_VERSION and BEAM_FNAPI_CONTAINER_VERSION
-# should match each other, and should be in the same format as the SDK version
-# (i.e. MAJOR.MINOR.PATCH). For non-released (dev) versions, read below.
 # Update this version to the next version whenever there is a change that will
-# require changes to legacy Dataflow worker execution environment.
+# require changes to the execution environment.
 # This should be in the beam-[version]-[date] format, date is optional.
-BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170626'
-# Update this version to the next version whenever there is a change that
-# requires changes to SDK harness container or SDK harness launcher.
-# This should be in the beam-[version]-[date] format, date is optional.
-BEAM_FNAPI_CONTAINER_VERSION = 'beam-2.1.0-20170621'
+BEAM_CONTAINER_VERSION = 'beam-2.1.0-20170601'
 
 # Standard file names used for staging files.
 WORKFLOW_TARBALL_FILE = 'workflow.tar.gz'
 REQUIREMENTS_FILE = 'requirements.txt'
 EXTRA_PACKAGES_FILE = 'extra_packages.txt'
 
-# Package names for different distributions
 GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow'
 BEAM_PACKAGE_NAME = 'apache-beam'
 
-# SDK identifiers for different distributions
-GOOGLE_SDK_NAME = 'Google Cloud Dataflow SDK for Python'
-BEAM_SDK_NAME = 'Apache Beam SDK for Python'
-
 
 def _dependency_file_copy(from_path, to_path):
   """Copies a local file to a GCS file or vice versa."""
@@ -488,33 +474,10 @@ def _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
         'type of location: %s' % sdk_remote_location)
 
 
-def get_default_container_image_for_current_sdk(job_type):
+def get_required_container_version():
   """For internal use only; no backwards-compatibility guarantees.
 
-  Args:
-    job_type (str): BEAM job type.
-
-  Returns:
-    str: Google Cloud Dataflow container image for remote execution.
-  """
-  # TODO(tvalentyn): Use enumerated type instead of strings for job types.
-  if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
-    image_name = 'dataflow.gcr.io/v1beta3/python-fnapi'
-  else:
-    image_name = 'dataflow.gcr.io/v1beta3/python'
-  image_tag = _get_required_container_version(job_type)
-  return image_name + ':' + image_tag
-
-
-def _get_required_container_version(job_type=None):
-  """For internal use only; no backwards-compatibility guarantees.
-
-  Args:
-    job_type (str, optional): BEAM job type. Defaults to None.
-
-  Returns:
-    str: The tag of worker container images in GCR that corresponds to
-      current version of the SDK.
+  Returns the Google Cloud Dataflow container version for remote execution.
   """
   # TODO(silviuc): Handle apache-beam versions when we have official releases.
   import pkg_resources as pkg
@@ -530,34 +493,28 @@ def _get_required_container_version(job_type=None):
   except pkg.DistributionNotFound:
     # This case covers Apache Beam end-to-end testing scenarios. All these tests
     # will run with a special container version.
-    if job_type == 'FNAPI_BATCH' or job_type == 'FNAPI_STREAMING':
-      return BEAM_FNAPI_CONTAINER_VERSION
-    else:
-      return BEAM_CONTAINER_VERSION
+    return BEAM_CONTAINER_VERSION
 
 
 def get_sdk_name_and_version():
   """For internal use only; no backwards-compatibility guarantees.
 
   Returns name and version of SDK reported to Google Cloud Dataflow."""
-  import pkg_resources as pkg
-  container_version = _get_required_container_version()
-  try:
-    pkg.get_distribution(GOOGLE_PACKAGE_NAME)
-    return (GOOGLE_SDK_NAME, container_version)
-  except pkg.DistributionNotFound:
-    return (BEAM_SDK_NAME, beam_version.__version__)
+  # TODO(ccy): Make this check cleaner.
+  container_version = get_required_container_version()
+  if container_version == BEAM_CONTAINER_VERSION:
+    return ('Apache Beam SDK for Python', beam_version.__version__)
+  return ('Google Cloud Dataflow SDK for Python', container_version)
 
 
 def get_sdk_package_name():
   """For internal use only; no backwards-compatibility guarantees.
 
   Returns the PyPI package name to be staged to Google Cloud Dataflow."""
-  sdk_name, _ = get_sdk_name_and_version()
-  if sdk_name == GOOGLE_SDK_NAME:
-    return GOOGLE_PACKAGE_NAME
-  else:
+  container_version = get_required_container_version()
+  if container_version == BEAM_CONTAINER_VERSION:
     return BEAM_PACKAGE_NAME
+  return GOOGLE_PACKAGE_NAME
 
 
 def _download_pypi_sdk_package(temp_dir):

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
index 3d8c24f..7610baf 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
@@ -20,9 +20,7 @@
 
 import unittest
 
-from apache_beam import Create
-from apache_beam import error
-from apache_beam import pvalue
+from apache_beam import error, pvalue
 from apache_beam.runners.dataflow.native_io.iobase import (
     _dict_printable_fields,
     _NativeWrite,
@@ -30,12 +28,10 @@ from apache_beam.runners.dataflow.native_io.iobase import (
     DynamicSplitRequest,
     DynamicSplitResultWithPosition,
     NativeSink,
-    NativeSinkWriter,
     NativeSource,
     ReaderPosition,
     ReaderProgress
 )
-from apache_beam.testing.test_pipeline import TestPipeline
 
 
 class TestHelperFunctions(unittest.TestCase):
@@ -158,39 +154,6 @@ class TestNativeSink(unittest.TestCase):
     fake_sink = FakeSink()
     self.assertEqual(fake_sink.__repr__(), "<FakeSink ['validate=False']>")
 
-  def test_on_direct_runner(self):
-    class FakeSink(NativeSink):
-      """A fake sink outputing a number of elements."""
-
-      def __init__(self):
-        self.written_values = []
-        self.writer_instance = FakeSinkWriter(self.written_values)
-
-      def writer(self):
-        return self.writer_instance
-
-    class FakeSinkWriter(NativeSinkWriter):
-      """A fake sink writer for testing."""
-
-      def __init__(self, written_values):
-        self.written_values = written_values
-
-      def __enter__(self):
-        return self
-
-      def __exit__(self, *unused_args):
-        pass
-
-      def Write(self, value):
-        self.written_values.append(value)
-
-    p = TestPipeline()
-    sink = FakeSink()
-    p | Create(['a', 'b', 'c']) | _NativeWrite(sink)  # pylint: disable=expression-not-assigned
-    p.run()
-
-    self.assertEqual(['a', 'b', 'c'], sink.written_values)
-
 
 class Test_NativeWrite(unittest.TestCase):
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
deleted file mode 100644
index 8c6c8d6..0000000
--- a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
+++ /dev/null
@@ -1,72 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Create transform for streaming."""
-
-from apache_beam import pvalue
-from apache_beam import DoFn
-from apache_beam import ParDo
-from apache_beam import PTransform
-from apache_beam import Windowing
-from apache_beam.transforms.window import GlobalWindows
-
-
-class StreamingCreate(PTransform):
-  """A specialized implementation for ``Create`` transform in streaming mode.
-
-  Note: There is no unbounded source API in python to wrap the Create source,
-  so we map this to composite of Impulse primitive and an SDF.
-  """
-
-  def __init__(self, values, coder):
-    self.coder = coder
-    self.encoded_values = map(coder.encode, values)
-
-  class DecodeAndEmitDoFn(DoFn):
-    """A DoFn which stores encoded versions of elements.
-
-    It also stores a Coder to decode and emit those elements.
-    TODO: BEAM-2422 - Make this a SplittableDoFn.
-    """
-
-    def __init__(self, encoded_values, coder):
-      self.encoded_values = encoded_values
-      self.coder = coder
-
-    def process(self, unused_element):
-      for encoded_value in self.encoded_values:
-        yield self.coder.decode(encoded_value)
-
-  class Impulse(PTransform):
-    """The Dataflow specific override for the impulse primitive."""
-
-    def expand(self, pbegin):
-      assert isinstance(pbegin, pvalue.PBegin), (
-          'Input to Impulse transform must be a PBegin but found %s' % pbegin)
-      return pvalue.PCollection(pbegin.pipeline)
-
-    def get_windowing(self, inputs):
-      return Windowing(GlobalWindows())
-
-    def infer_output_type(self, unused_input_type):
-      return bytes
-
-  def expand(self, pbegin):
-    return (pbegin
-            | 'Impulse' >> self.Impulse()
-            | 'Decode Values' >> ParDo(
-                self.DecodeAndEmitDoFn(self.encoded_values, self.coder)))

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
deleted file mode 100644
index 680a4b7..0000000
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Ptransform overrides for DataflowRunner."""
-
-from apache_beam.coders import typecoders
-from apache_beam.pipeline import PTransformOverride
-
-
-class CreatePTransformOverride(PTransformOverride):
-  """A ``PTransformOverride`` for ``Create`` in streaming mode."""
-
-  def get_matcher(self):
-    return self.is_streaming_create
-
-  @staticmethod
-  def is_streaming_create(applied_ptransform):
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam import Create
-    from apache_beam.options.pipeline_options import StandardOptions
-
-    if isinstance(applied_ptransform.transform, Create):
-      standard_options = (applied_ptransform
-                          .outputs[None]
-                          .pipeline._options
-                          .view_as(StandardOptions))
-      return standard_options.streaming
-    else:
-      return False
-
-  def get_replacement_transform(self, ptransform):
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam.runners.dataflow.native_io.streaming_create import \
-      StreamingCreate
-    coder = typecoders.registry.get_coder(ptransform.get_output_type())
-    return StreamingCreate(ptransform.value, coder)

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 0182b4c..ed00b03 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -108,7 +108,7 @@ class _Bundle(object):
                             self._initial_windowed_value.windows)
 
   def __init__(self, pcollection, stacked=True):
-    assert isinstance(pcollection, (pvalue.PBegin, pvalue.PCollection))
+    assert isinstance(pcollection, pvalue.PCollection)
     self._pcollection = pcollection
     self._elements = []
     self._stacked = stacked

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 1a94b3d..ecf5114 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -26,55 +26,22 @@ from __future__ import absolute_import
 import collections
 import logging
 
-import apache_beam as beam
-from apache_beam import typehints
 from apache_beam.metrics.execution import MetricsEnvironment
-from apache_beam.pvalue import PCollection
 from apache_beam.runners.direct.bundle_factory import BundleFactory
 from apache_beam.runners.runner import PipelineResult
 from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import PValueCache
-from apache_beam.transforms.core import _GroupAlsoByWindow
-from apache_beam.transforms.core import _GroupByKeyOnly
 from apache_beam.options.pipeline_options import DirectOptions
-from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.value_provider import RuntimeValueProvider
 
 
 __all__ = ['DirectRunner']
 
 
-# Type variables.
-K = typehints.TypeVariable('K')
-V = typehints.TypeVariable('V')
-
-
-@typehints.with_input_types(typehints.KV[K, V])
-@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
-class _StreamingGroupByKeyOnly(_GroupByKeyOnly):
-  """Streaming GroupByKeyOnly placeholder for overriding in DirectRunner."""
-  pass
-
-
-@typehints.with_input_types(typehints.KV[K, typehints.Iterable[V]])
-@typehints.with_output_types(typehints.KV[K, typehints.Iterable[V]])
-class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow):
-  """Streaming GroupAlsoByWindow placeholder for overriding in DirectRunner."""
-  pass
-
-
 class DirectRunner(PipelineRunner):
   """Executes a single pipeline on the local machine."""
 
-  # A list of PTransformOverride objects to be applied before running a pipeline
-  # using DirectRunner.
-  # Currently this only works for overrides where the input and output types do
-  # not change.
-  # For internal SDK use only. This should not be updated by Beam pipeline
-  # authors.
-  _PTRANSFORM_OVERRIDES = []
-
   def __init__(self):
     self._cache = None
 
@@ -89,84 +56,9 @@ class DirectRunner(PipelineRunner):
     except NotImplementedError:
       return transform.expand(pcoll)
 
-  def apply__GroupByKeyOnly(self, transform, pcoll):
-    if (transform.__class__ == _GroupByKeyOnly and
-        pcoll.pipeline._options.view_as(StandardOptions).streaming):
-      # Use specialized streaming implementation, if requested.
-      type_hints = transform.get_type_hints()
-      return pcoll | (_StreamingGroupByKeyOnly()
-                      .with_input_types(*type_hints.input_types[0])
-                      .with_output_types(*type_hints.output_types[0]))
-    return transform.expand(pcoll)
-
-  def apply__GroupAlsoByWindow(self, transform, pcoll):
-    if (transform.__class__ == _GroupAlsoByWindow and
-        pcoll.pipeline._options.view_as(StandardOptions).streaming):
-      # Use specialized streaming implementation, if requested.
-      type_hints = transform.get_type_hints()
-      return pcoll | (_StreamingGroupAlsoByWindow(transform.windowing)
-                      .with_input_types(*type_hints.input_types[0])
-                      .with_output_types(*type_hints.output_types[0]))
-    return transform.expand(pcoll)
-
-  def apply_ReadStringsFromPubSub(self, transform, pcoll):
-    try:
-      from google.cloud import pubsub as unused_pubsub
-    except ImportError:
-      raise ImportError('Google Cloud PubSub not available, please install '
-                        'apache_beam[gcp]')
-    # Execute this as a native transform.
-    output = PCollection(pcoll.pipeline)
-    output.element_type = unicode
-    return output
-
-  def apply_WriteStringsToPubSub(self, transform, pcoll):
-    try:
-      from google.cloud import pubsub
-    except ImportError:
-      raise ImportError('Google Cloud PubSub not available, please install '
-                        'apache_beam[gcp]')
-    project = transform._sink.project
-    topic_name = transform._sink.topic_name
-
-    class DirectWriteToPubSub(beam.DoFn):
-      _topic = None
-
-      def __init__(self, project, topic_name):
-        self.project = project
-        self.topic_name = topic_name
-
-      def start_bundle(self):
-        if self._topic is None:
-          self._topic = pubsub.Client(project=self.project).topic(
-              self.topic_name)
-        self._buffer = []
-
-      def process(self, elem):
-        self._buffer.append(elem.encode('utf-8'))
-        if len(self._buffer) >= 100:
-          self._flush()
-
-      def finish_bundle(self):
-        self._flush()
-
-      def _flush(self):
-        if self._buffer:
-          with self._topic.batch() as batch:
-            for datum in self._buffer:
-              batch.publish(datum)
-          self._buffer = []
-
-    output = pcoll | beam.ParDo(DirectWriteToPubSub(project, topic_name))
-    output.element_type = unicode
-    return output
-
   def run(self, pipeline):
     """Execute the entire pipeline and returns an DirectPipelineResult."""
 
-    # Performing configured PTransform overrides.
-    pipeline.replace_all(DirectRunner._PTRANSFORM_OVERRIDES)
-
     # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
     # with resolving imports when they are at top.
     # pylint: disable=wrong-import-position

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 54c407c..68d99d3 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -27,22 +27,22 @@ from apache_beam.runners.direct.clock import Clock
 from apache_beam.runners.direct.watermark_manager import WatermarkManager
 from apache_beam.runners.direct.executor import TransformExecutor
 from apache_beam.runners.direct.direct_metrics import DirectMetrics
-from apache_beam.transforms.trigger import InMemoryUnmergedState
 from apache_beam.utils import counters
 
 
 class _ExecutionContext(object):
 
-  def __init__(self, watermarks, keyed_states):
-    self.watermarks = watermarks
-    self.keyed_states = keyed_states
+  def __init__(self, watermarks, existing_state):
+    self._watermarks = watermarks
+    self._existing_state = existing_state
 
-    self._step_context = None
+  @property
+  def watermarks(self):
+    return self._watermarks
 
-  def get_step_context(self):
-    if not self._step_context:
-      self._step_context = DirectStepContext(self.keyed_states)
-    return self._step_context
+  @property
+  def existing_state(self):
+    return self._existing_state
 
 
 class _SideInputView(object):
@@ -145,11 +145,11 @@ class EvaluationContext(object):
     self._pcollection_to_views = collections.defaultdict(list)
     for view in views:
       self._pcollection_to_views[view.pvalue].append(view)
-    self._transform_keyed_states = self._initialize_keyed_states(
-        root_transforms, value_to_consumers)
+
+    # AppliedPTransform -> Evaluator specific state objects
+    self._application_state_interals = {}
     self._watermark_manager = WatermarkManager(
-        Clock(), root_transforms, value_to_consumers,
-        self._transform_keyed_states)
+        Clock(), root_transforms, value_to_consumers)
     self._side_inputs_container = _SideInputsContainer(views)
     self._pending_unblocked_tasks = []
     self._counter_factory = counters.CounterFactory()
@@ -158,15 +158,6 @@ class EvaluationContext(object):
 
     self._lock = threading.Lock()
 
-  def _initialize_keyed_states(self, root_transforms, value_to_consumers):
-    transform_keyed_states = {}
-    for transform in root_transforms:
-      transform_keyed_states[transform] = {}
-    for consumers in value_to_consumers.values():
-      for consumer in consumers:
-        transform_keyed_states[consumer] = {}
-    return transform_keyed_states
-
   def use_pvalue_cache(self, cache):
     assert not self._cache
     self._cache = cache
@@ -208,12 +199,11 @@ class EvaluationContext(object):
       the committed bundles contained within the handled result.
     """
     with self._lock:
-      committed_bundles, unprocessed_bundles = self._commit_bundles(
-          result.uncommitted_output_bundles,
-          result.unprocessed_bundles)
+      committed_bundles = self._commit_bundles(
+          result.uncommitted_output_bundles)
       self._watermark_manager.update_watermarks(
           completed_bundle, result.transform, completed_timers,
-          committed_bundles, unprocessed_bundles, result.keyed_watermark_holds)
+          committed_bundles, result.watermark_hold)
 
       self._metrics.commit_logical(completed_bundle,
                                    result.logical_metric_updates)
@@ -241,6 +231,7 @@ class EvaluationContext(object):
               counter.name, counter.combine_fn)
           merged_counter.accumulator.merge([counter.accumulator])
 
+      self._application_state_interals[result.transform] = result.state
       return committed_bundles
 
   def get_aggregator_values(self, aggregator_or_name):
@@ -253,22 +244,19 @@ class EvaluationContext(object):
           executor_service.submit(task)
         self._pending_unblocked_tasks = []
 
-  def _commit_bundles(self, uncommitted_bundles, unprocessed_bundles):
+  def _commit_bundles(self, uncommitted_bundles):
     """Commits bundles and returns a immutable set of committed bundles."""
     for in_progress_bundle in uncommitted_bundles:
       producing_applied_ptransform = in_progress_bundle.pcollection.producer
       watermarks = self._watermark_manager.get_watermarks(
           producing_applied_ptransform)
       in_progress_bundle.commit(watermarks.synchronized_processing_output_time)
-
-    for unprocessed_bundle in unprocessed_bundles:
-      unprocessed_bundle.commit(None)
-    return tuple(uncommitted_bundles), tuple(unprocessed_bundles)
+    return tuple(uncommitted_bundles)
 
   def get_execution_context(self, applied_ptransform):
     return _ExecutionContext(
         self._watermark_manager.get_watermarks(applied_ptransform),
-        self._transform_keyed_states[applied_ptransform])
+        self._application_state_interals.get(applied_ptransform))
 
   def create_bundle(self, output_pcollection):
     """Create an uncommitted bundle for the specified PCollection."""
@@ -308,24 +296,3 @@ class EvaluationContext(object):
     assert isinstance(task, TransformExecutor)
     return self._side_inputs_container.get_value_or_schedule_after_output(
         side_input, task)
-
-
-class DirectUnmergedState(InMemoryUnmergedState):
-  """UnmergedState implementation for the DirectRunner."""
-
-  def __init__(self):
-    super(DirectUnmergedState, self).__init__(defensive_copy=False)
-
-
-class DirectStepContext(object):
-  """Context for the currently-executing step."""
-
-  def __init__(self, keyed_existing_state):
-    self.keyed_existing_state = keyed_existing_state
-
-  def get_keyed_state(self, key):
-    # TODO(ccy): consider implementing transactional copy on write semantics
-    # for state so that work items can be safely retried.
-    if not self.keyed_existing_state.get(key):
-      self.keyed_existing_state[key] = DirectUnmergedState()
-    return self.keyed_existing_state[key]

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index e70e326..86db291 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -20,7 +20,6 @@
 from __future__ import absolute_import
 
 import collections
-import itertools
 import logging
 import Queue
 import sys
@@ -222,30 +221,22 @@ class _CompletionCallback(object):
   or for a source transform.
   """
 
-  def __init__(self, evaluation_context, all_updates, timer_firings=None):
+  def __init__(self, evaluation_context, all_updates, timers=None):
     self._evaluation_context = evaluation_context
     self._all_updates = all_updates
-    self._timer_firings = timer_firings or []
+    self._timers = timers
 
-  def handle_result(self, transform_executor, input_committed_bundle,
-                    transform_result):
+  def handle_result(self, input_committed_bundle, transform_result):
     output_committed_bundles = self._evaluation_context.handle_result(
-        input_committed_bundle, self._timer_firings, transform_result)
+        input_committed_bundle, self._timers, transform_result)
     for output_committed_bundle in output_committed_bundles:
       self._all_updates.offer(_ExecutorServiceParallelExecutor._ExecutorUpdate(
-          transform_executor,
-          committed_bundle=output_committed_bundle))
-    for unprocessed_bundle in transform_result.unprocessed_bundles:
-      self._all_updates.offer(
-          _ExecutorServiceParallelExecutor._ExecutorUpdate(
-              transform_executor,
-              unprocessed_bundle=unprocessed_bundle))
+          output_committed_bundle, None))
     return output_committed_bundles
 
-  def handle_exception(self, transform_executor, exception):
+  def handle_exception(self, exception):
     self._all_updates.offer(
-        _ExecutorServiceParallelExecutor._ExecutorUpdate(
-            transform_executor, exception=exception))
+        _ExecutorServiceParallelExecutor._ExecutorUpdate(None, exception))
 
 
 class TransformExecutor(_ExecutorService.CallableTask):
@@ -259,13 +250,12 @@ class TransformExecutor(_ExecutorService.CallableTask):
   """
 
   def __init__(self, transform_evaluator_registry, evaluation_context,
-               input_bundle, fired_timers, applied_ptransform,
-               completion_callback, transform_evaluation_state):
+               input_bundle, applied_transform, completion_callback,
+               transform_evaluation_state):
     self._transform_evaluator_registry = transform_evaluator_registry
     self._evaluation_context = evaluation_context
     self._input_bundle = input_bundle
-    self._fired_timers = fired_timers
-    self._applied_ptransform = applied_ptransform
+    self._applied_transform = applied_transform
     self._completion_callback = completion_callback
     self._transform_evaluation_state = transform_evaluation_state
     self._side_input_values = {}
@@ -274,11 +264,11 @@ class TransformExecutor(_ExecutorService.CallableTask):
 
   def call(self):
     self._call_count += 1
-    assert self._call_count <= (1 + len(self._applied_ptransform.side_inputs))
-    metrics_container = MetricsContainer(self._applied_ptransform.full_label)
+    assert self._call_count <= (1 + len(self._applied_transform.side_inputs))
+    metrics_container = MetricsContainer(self._applied_transform.full_label)
     scoped_metrics_container = ScopedMetricsContainer(metrics_container)
 
-    for side_input in self._applied_ptransform.side_inputs:
+    for side_input in self._applied_transform.side_inputs:
       if side_input not in self._side_input_values:
         has_result, value = (
             self._evaluation_context.get_value_or_schedule_after_output(
@@ -290,17 +280,13 @@ class TransformExecutor(_ExecutorService.CallableTask):
         self._side_input_values[side_input] = value
 
     side_input_values = [self._side_input_values[side_input]
-                         for side_input in self._applied_ptransform.side_inputs]
+                         for side_input in self._applied_transform.side_inputs]
 
     try:
-      evaluator = self._transform_evaluator_registry.get_evaluator(
-          self._applied_ptransform, self._input_bundle,
+      evaluator = self._transform_evaluator_registry.for_application(
+          self._applied_transform, self._input_bundle,
           side_input_values, scoped_metrics_container)
 
-      if self._fired_timers:
-        for timer_firing in self._fired_timers:
-          evaluator.process_timer_wrapper(timer_firing)
-
       if self._input_bundle:
         for value in self._input_bundle.get_elements_iterable():
           evaluator.process_element(value)
@@ -312,18 +298,18 @@ class TransformExecutor(_ExecutorService.CallableTask):
       if self._evaluation_context.has_cache:
         for uncommitted_bundle in result.uncommitted_output_bundles:
           self._evaluation_context.append_to_cache(
-              self._applied_ptransform, uncommitted_bundle.tag,
+              self._applied_transform, uncommitted_bundle.tag,
               uncommitted_bundle.get_elements_iterable())
         undeclared_tag_values = result.undeclared_tag_values
         if undeclared_tag_values:
           for tag, value in undeclared_tag_values.iteritems():
             self._evaluation_context.append_to_cache(
-                self._applied_ptransform, tag, value)
+                self._applied_transform, tag, value)
 
-      self._completion_callback.handle_result(self, self._input_bundle, result)
+      self._completion_callback.handle_result(self._input_bundle, result)
       return result
     except Exception as e:  # pylint: disable=broad-except
-      self._completion_callback.handle_exception(self, e)
+      self._completion_callback.handle_exception(e)
     finally:
       self._evaluation_context.metrics().commit_physical(
           self._input_bundle,
@@ -367,15 +353,6 @@ class _ExecutorServiceParallelExecutor(object):
 
   def start(self, roots):
     self.root_nodes = frozenset(roots)
-    self.all_nodes = frozenset(
-        itertools.chain(
-            roots,
-            *itertools.chain(self.value_to_consumers.values())))
-    self.node_to_pending_bundles = {}
-    for root_node in self.root_nodes:
-      provider = (self.transform_evaluator_registry
-                  .get_root_bundle_provider(root_node))
-      self.node_to_pending_bundles[root_node] = provider.get_root_bundles()
     self.executor_service.submit(
         _ExecutorServiceParallelExecutor._MonitorTask(self))
 
@@ -392,30 +369,26 @@ class _ExecutorServiceParallelExecutor(object):
     if committed_bundle.pcollection in self.value_to_consumers:
       consumers = self.value_to_consumers[committed_bundle.pcollection]
       for applied_ptransform in consumers:
-        self.schedule_consumption(applied_ptransform, committed_bundle, [],
+        self.schedule_consumption(applied_ptransform, committed_bundle,
                                   self.default_completion_callback)
 
-  def schedule_unprocessed_bundle(self, applied_ptransform,
-                                  unprocessed_bundle):
-    self.node_to_pending_bundles[applied_ptransform].append(unprocessed_bundle)
-
-  def schedule_consumption(self, consumer_applied_ptransform, committed_bundle,
-                           fired_timers, on_complete):
+  def schedule_consumption(self, consumer_applied_transform, committed_bundle,
+                           on_complete):
     """Schedules evaluation of the given bundle with the transform."""
-    assert consumer_applied_ptransform
-    assert committed_bundle
-    assert on_complete
-    if self.transform_evaluator_registry.should_execute_serially(
-        consumer_applied_ptransform):
+    assert all([consumer_applied_transform, on_complete])
+    assert committed_bundle or consumer_applied_transform in self.root_nodes
+    if (committed_bundle
+        and self.transform_evaluator_registry.should_execute_serially(
+            consumer_applied_transform)):
       transform_executor_service = self.transform_executor_services.serial(
-          consumer_applied_ptransform)
+          consumer_applied_transform)
     else:
       transform_executor_service = self.transform_executor_services.parallel()
 
     transform_executor = TransformExecutor(
         self.transform_evaluator_registry, self.evaluation_context,
-        committed_bundle, fired_timers, consumer_applied_ptransform,
-        on_complete, transform_executor_service)
+        committed_bundle, consumer_applied_transform, on_complete,
+        transform_executor_service)
     transform_executor_service.schedule(transform_executor)
 
   class _TypedUpdateQueue(object):
@@ -445,16 +418,10 @@ class _ExecutorServiceParallelExecutor(object):
   class _ExecutorUpdate(object):
     """An internal status update on the state of the executor."""
 
-    def __init__(self, transform_executor, committed_bundle=None,
-                 unprocessed_bundle=None, exception=None):
-      self.transform_executor = transform_executor
+    def __init__(self, produced_bundle=None, exception=None):
       # Exactly one of them should be not-None
-      assert sum([
-          bool(committed_bundle),
-          bool(unprocessed_bundle),
-          bool(exception)]) == 1
-      self.committed_bundle = committed_bundle
-      self.unprocessed_bundle = unprocessed_bundle
+      assert bool(produced_bundle) != bool(exception)
+      self.committed_bundle = produced_bundle
       self.exception = exception
       self.exc_info = sys.exc_info()
       if self.exc_info[1] is not exception:
@@ -489,10 +456,6 @@ class _ExecutorServiceParallelExecutor(object):
         while update:
           if update.committed_bundle:
             self._executor.schedule_consumers(update.committed_bundle)
-          elif update.unprocessed_bundle:
-            self._executor.schedule_unprocessed_bundle(
-                update.transform_executor._applied_ptransform,
-                update.unprocessed_bundle)
           else:
             assert update.exception
             logging.warning('A task failed with exception.\n %s',
@@ -554,21 +517,19 @@ class _ExecutorServiceParallelExecutor(object):
       Returns:
         True if timers fired.
       """
-      transform_fired_timers = (
-          self._executor.evaluation_context.extract_fired_timers())
-      for applied_ptransform, fired_timers in transform_fired_timers:
+      fired_timers = self._executor.evaluation_context.extract_fired_timers()
+      for applied_ptransform in fired_timers:
         # Use an empty committed bundle. just to trigger.
         empty_bundle = (
             self._executor.evaluation_context.create_empty_committed_bundle(
                 applied_ptransform.inputs[0]))
         timer_completion_callback = _CompletionCallback(
             self._executor.evaluation_context, self._executor.all_updates,
-            timer_firings=fired_timers)
+            applied_ptransform)
 
         self._executor.schedule_consumption(
-            applied_ptransform, empty_bundle, fired_timers,
-            timer_completion_callback)
-      return bool(transform_fired_timers)
+            applied_ptransform, empty_bundle, timer_completion_callback)
+      return bool(fired_timers)
 
     def _is_executing(self):
       """Returns True if there is at least one non-blocked TransformExecutor."""
@@ -603,14 +564,10 @@ class _ExecutorServiceParallelExecutor(object):
         # additional work.
         return
 
-      # All current TransformExecutors are blocked; add more work from any
-      # pending bundles.
-      for applied_ptransform in self._executor.all_nodes:
-        if not self._executor.evaluation_context.is_done(applied_ptransform):
-          pending_bundles = self._executor.node_to_pending_bundles.get(
-              applied_ptransform, [])
-          for bundle in pending_bundles:
-            self._executor.schedule_consumption(
-                applied_ptransform, bundle, [],
-                self._executor.default_completion_callback)
-          self._executor.node_to_pending_bundles[applied_ptransform] = []
+      # All current TransformExecutors are blocked; add more work from the
+      # roots.
+      for applied_transform in self._executor.root_nodes:
+        if not self._executor.evaluation_context.is_done(applied_transform):
+          self._executor.schedule_consumption(
+              applied_transform, None,
+              self._executor.default_completion_callback)

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index cb2ace2..b1cb626 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -20,8 +20,6 @@
 from __future__ import absolute_import
 
 import collections
-import random
-import time
 
 from apache_beam import coders
 from apache_beam import pvalue
@@ -29,29 +27,16 @@ from apache_beam.internal import pickler
 import apache_beam.io as io
 from apache_beam.runners.common import DoFnRunner
 from apache_beam.runners.common import DoFnState
-from apache_beam.runners.direct.direct_runner import _StreamingGroupByKeyOnly
-from apache_beam.runners.direct.direct_runner import _StreamingGroupAlsoByWindow
 from apache_beam.runners.direct.watermark_manager import WatermarkManager
-from apache_beam.runners.direct.util import KeyedWorkItem
-from apache_beam.runners.direct.util import TransformResult
+from apache_beam.runners.direct.transform_result import TransformResult
 from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite  # pylint: disable=protected-access
-from apache_beam.testing.test_stream import TestStream
-from apache_beam.testing.test_stream import ElementEvent
-from apache_beam.testing.test_stream import WatermarkEvent
-from apache_beam.testing.test_stream import ProcessingTimeEvent
 from apache_beam.transforms import core
 from apache_beam.transforms.window import GlobalWindows
 from apache_beam.transforms.window import WindowedValue
-from apache_beam.transforms.trigger import create_trigger_driver
-from apache_beam.transforms.trigger import _CombiningValueStateTag
-from apache_beam.transforms.trigger import _ListStateTag
-from apache_beam.transforms.trigger import TimeDomain
 from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
 from apache_beam.typehints.typecheck import TypeCheckError
 from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
 from apache_beam.utils import counters
-from apache_beam.utils.timestamp import Timestamp
-from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.options.pipeline_options import TypeOptions
 
 
@@ -66,21 +51,13 @@ class TransformEvaluatorRegistry(object):
     self._evaluation_context = evaluation_context
     self._evaluators = {
         io.Read: _BoundedReadEvaluator,
-        io.ReadStringsFromPubSub: _PubSubReadEvaluator,
         core.Flatten: _FlattenEvaluator,
         core.ParDo: _ParDoEvaluator,
         core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
-        _StreamingGroupByKeyOnly: _StreamingGroupByKeyOnlyEvaluator,
-        _StreamingGroupAlsoByWindow: _StreamingGroupAlsoByWindowEvaluator,
         _NativeWrite: _NativeWriteEvaluator,
-        TestStream: _TestStreamEvaluator,
-    }
-    self._root_bundle_providers = {
-        core.PTransform: DefaultRootBundleProvider,
-        TestStream: _TestStreamRootBundleProvider,
     }
 
-  def get_evaluator(
+  def for_application(
       self, applied_ptransform, input_committed_bundle,
       side_inputs, scoped_metrics_container):
     """Returns a TransformEvaluator suitable for processing given inputs."""
@@ -102,18 +79,6 @@ class TransformEvaluatorRegistry(object):
                      input_committed_bundle, side_inputs,
                      scoped_metrics_container)
 
-  def get_root_bundle_provider(self, applied_ptransform):
-    provider_cls = None
-    for cls in applied_ptransform.transform.__class__.mro():
-      provider_cls = self._root_bundle_providers.get(cls)
-      if provider_cls:
-        break
-    if not provider_cls:
-      raise NotImplementedError(
-          'Root provider for [%s] not implemented in runner %s' % (
-              type(applied_ptransform.transform), self))
-    return provider_cls(self._evaluation_context, applied_ptransform)
-
   def should_execute_serially(self, applied_ptransform):
     """Returns True if this applied_ptransform should run one bundle at a time.
 
@@ -134,48 +99,7 @@ class TransformEvaluatorRegistry(object):
       True if executor should execute applied_ptransform serially.
     """
     return isinstance(applied_ptransform.transform,
-                      (core._GroupByKeyOnly,
-                       _StreamingGroupByKeyOnly,
-                       _StreamingGroupAlsoByWindow,
-                       _NativeWrite,))
-
-
-class RootBundleProvider(object):
-  """Provides bundles for the initial execution of a root transform."""
-
-  def __init__(self, evaluation_context, applied_ptransform):
-    self._evaluation_context = evaluation_context
-    self._applied_ptransform = applied_ptransform
-
-  def get_root_bundles(self):
-    raise NotImplementedError
-
-
-class DefaultRootBundleProvider(RootBundleProvider):
-  """Provides an empty bundle by default for root transforms."""
-
-  def get_root_bundles(self):
-    input_node = pvalue.PBegin(self._applied_ptransform.transform.pipeline)
-    empty_bundle = (
-        self._evaluation_context.create_empty_committed_bundle(input_node))
-    return [empty_bundle]
-
-
-class _TestStreamRootBundleProvider(RootBundleProvider):
-  """Provides an initial bundle for the TestStream evaluator."""
-
-  def get_root_bundles(self):
-    test_stream = self._applied_ptransform.transform
-    bundles = []
-    if len(test_stream.events) > 0:
-      bundle = self._evaluation_context.create_bundle(
-          pvalue.PBegin(self._applied_ptransform.transform.pipeline))
-      # Explicitly set timestamp to MIN_TIMESTAMP to ensure that we hold the
-      # watermark.
-      bundle.add(GlobalWindows.windowed_value(0, timestamp=MIN_TIMESTAMP))
-      bundle.commit(None)
-      bundles.append(bundle)
-    return bundles
+                      (core._GroupByKeyOnly, _NativeWrite))
 
 
 class _TransformEvaluator(object):
@@ -237,27 +161,6 @@ class _TransformEvaluator(object):
     """Starts a new bundle."""
     pass
 
-  def process_timer_wrapper(self, timer_firing):
-    """Process timer by clearing and then calling process_timer().
-
-    This method is called with any timer firing and clears the delivered
-    timer from the keyed state and then calls process_timer().  The default
-    process_timer() implementation emits a KeyedWorkItem for the particular
-    timer and passes it to process_element().  Evaluator subclasses which
-    desire different timer delivery semantics can override process_timer().
-    """
-    state = self.step_context.get_keyed_state(timer_firing.encoded_key)
-    state.clear_timer(
-        timer_firing.window, timer_firing.name, timer_firing.time_domain)
-    self.process_timer(timer_firing)
-
-  def process_timer(self, timer_firing):
-    """Default process_timer() impl. generating KeyedWorkItem element."""
-    self.process_element(
-        GlobalWindows.windowed_value(
-            KeyedWorkItem(timer_firing.encoded_key,
-                          timer_firings=[timer_firing])))
-
   def process_element(self, element):
     """Processes a new element as part of the current bundle."""
     raise NotImplementedError('%s do not process elements.', type(self))
@@ -275,6 +178,7 @@ class _BoundedReadEvaluator(_TransformEvaluator):
 
   def __init__(self, evaluation_context, applied_ptransform,
                input_committed_bundle, side_inputs, scoped_metrics_container):
+    assert not input_committed_bundle
     assert not side_inputs
     self._source = applied_ptransform.transform.source
     self._source.pipeline_options = evaluation_context.pipeline_options
@@ -303,148 +207,7 @@ class _BoundedReadEvaluator(_TransformEvaluator):
         bundles = _read_values_to_bundles(reader)
 
     return TransformResult(
-        self._applied_ptransform, bundles, [], None, None)
-
-
-class _TestStreamEvaluator(_TransformEvaluator):
-  """TransformEvaluator for the TestStream transform."""
-
-  def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs, scoped_metrics_container):
-    assert not side_inputs
-    self.test_stream = applied_ptransform.transform
-    super(_TestStreamEvaluator, self).__init__(
-        evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs, scoped_metrics_container)
-
-  def start_bundle(self):
-    self.current_index = -1
-    self.watermark = MIN_TIMESTAMP
-    self.bundles = []
-
-  def process_element(self, element):
-    index = element.value
-    self.watermark = element.timestamp
-    assert isinstance(index, int)
-    assert 0 <= index <= len(self.test_stream.events)
-    self.current_index = index
-    event = self.test_stream.events[self.current_index]
-    if isinstance(event, ElementEvent):
-      assert len(self._outputs) == 1
-      output_pcollection = list(self._outputs)[0]
-      bundle = self._evaluation_context.create_bundle(output_pcollection)
-      for tv in event.timestamped_values:
-        bundle.output(
-            GlobalWindows.windowed_value(tv.value, timestamp=tv.timestamp))
-      self.bundles.append(bundle)
-    elif isinstance(event, WatermarkEvent):
-      assert event.new_watermark >= self.watermark
-      self.watermark = event.new_watermark
-    elif isinstance(event, ProcessingTimeEvent):
-      # TODO(ccy): advance processing time in the context's mock clock.
-      pass
-    else:
-      raise ValueError('Invalid TestStream event: %s.' % event)
-
-  def finish_bundle(self):
-    unprocessed_bundles = []
-    hold = None
-    if self.current_index < len(self.test_stream.events) - 1:
-      unprocessed_bundle = self._evaluation_context.create_bundle(
-          pvalue.PBegin(self._applied_ptransform.transform.pipeline))
-      unprocessed_bundle.add(GlobalWindows.windowed_value(
-          self.current_index + 1, timestamp=self.watermark))
-      unprocessed_bundles.append(unprocessed_bundle)
-      hold = self.watermark
-    return TransformResult(
-        self._applied_ptransform, self.bundles, unprocessed_bundles, None,
-        {None: hold})
-
-
-class _PubSubSubscriptionWrapper(object):
-  """Wrapper for garbage-collecting temporary PubSub subscriptions."""
-
-  def __init__(self, subscription, should_cleanup):
-    self.subscription = subscription
-    self.should_cleanup = should_cleanup
-
-  def __del__(self):
-    if self.should_cleanup:
-      self.subscription.delete()
-
-
-class _PubSubReadEvaluator(_TransformEvaluator):
-  """TransformEvaluator for PubSub read."""
-
-  _subscription_cache = {}
-
-  def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs, scoped_metrics_container):
-    assert not side_inputs
-    super(_PubSubReadEvaluator, self).__init__(
-        evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs, scoped_metrics_container)
-
-    source = self._applied_ptransform.transform._source
-    self._subscription = _PubSubReadEvaluator.get_subscription(
-        self._applied_ptransform, source.project, source.topic_name,
-        source.subscription_name)
-
-  @classmethod
-  def get_subscription(cls, transform, project, topic, subscription_name):
-    if transform not in cls._subscription_cache:
-      from google.cloud import pubsub
-      should_create = not subscription_name
-      if should_create:
-        subscription_name = 'beam_%d_%x' % (
-            int(time.time()), random.randrange(1 << 32))
-      cls._subscription_cache[transform] = _PubSubSubscriptionWrapper(
-          pubsub.Client(project=project).topic(topic).subscription(
-              subscription_name),
-          should_create)
-      if should_create:
-        cls._subscription_cache[transform].subscription.create()
-    return cls._subscription_cache[transform].subscription
-
-  def start_bundle(self):
-    pass
-
-  def process_element(self, element):
-    pass
-
-  def _read_from_pubsub(self):
-    from google.cloud import pubsub
-    # Because of the AutoAck, we are not able to reread messages if this
-    # evaluator fails with an exception before emitting a bundle. However,
-    # the DirectRunner currently doesn't retry work items anyway, so the
-    # pipeline would enter an inconsistent state on any error.
-    with pubsub.subscription.AutoAck(
-        self._subscription, return_immediately=True,
-        max_messages=10) as results:
-      return [message.data for unused_ack_id, message in results.items()]
-
-  def finish_bundle(self):
-    data = self._read_from_pubsub()
-    if data:
-      output_pcollection = list(self._outputs)[0]
-      bundle = self._evaluation_context.create_bundle(output_pcollection)
-      # TODO(ccy): we currently do not use the PubSub message timestamp or
-      # respect the PubSub source's id_label field.
-      now = Timestamp.of(time.time())
-      for message_data in data:
-        bundle.output(GlobalWindows.windowed_value(message_data, timestamp=now))
-      bundles = [bundle]
-    else:
-      bundles = []
-    if self._applied_ptransform.inputs:
-      input_pvalue = self._applied_ptransform.inputs[0]
-    else:
-      input_pvalue = pvalue.PBegin(self._applied_ptransform.transform.pipeline)
-    unprocessed_bundle = self._evaluation_context.create_bundle(
-        input_pvalue)
-    return TransformResult(
-        self._applied_ptransform, bundles,
-        [unprocessed_bundle], None, {None: Timestamp.of(time.time())})
+        self._applied_ptransform, bundles, None, None, None, None)
 
 
 class _FlattenEvaluator(_TransformEvaluator):
@@ -468,7 +231,7 @@ class _FlattenEvaluator(_TransformEvaluator):
   def finish_bundle(self):
     bundles = [self.bundle]
     return TransformResult(
-        self._applied_ptransform, bundles, [], None, None)
+        self._applied_ptransform, bundles, None, None, None, None)
 
 
 class _TaggedReceivers(dict):
@@ -557,7 +320,7 @@ class _ParDoEvaluator(_TransformEvaluator):
     bundles = self._tagged_receivers.values()
     result_counters = self._counter_factory.get_counters()
     return TransformResult(
-        self._applied_ptransform, bundles, [], result_counters, None,
+        self._applied_ptransform, bundles, None, None, result_counters, None,
         self._tagged_receivers.undeclared_in_memory_tag_values)
 
 
@@ -565,8 +328,13 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
   """TransformEvaluator for _GroupByKeyOnly transform."""
 
   MAX_ELEMENT_PER_BUNDLE = None
-  ELEMENTS_TAG = _ListStateTag('elements')
-  COMPLETION_TAG = _CombiningValueStateTag('completed', any)
+
+  class _GroupByKeyOnlyEvaluatorState(object):
+
+    def __init__(self):
+      # output: {} key -> [values]
+      self.output = collections.defaultdict(list)
+      self.completed = False
 
   def __init__(self, evaluation_context, applied_ptransform,
                input_committed_bundle, side_inputs, scoped_metrics_container):
@@ -575,13 +343,15 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
         evaluation_context, applied_ptransform, input_committed_bundle,
         side_inputs, scoped_metrics_container)
 
+  @property
   def _is_final_bundle(self):
     return (self._execution_context.watermarks.input_watermark
             == WatermarkManager.WATERMARK_POS_INF)
 
   def start_bundle(self):
-    self.step_context = self._execution_context.get_step_context()
-    self.global_state = self.step_context.get_keyed_state(None)
+    self.state = (self._execution_context.existing_state
+                  if self._execution_context.existing_state
+                  else _GroupByKeyOnlyEvaluator._GroupByKeyOnlyEvaluatorState())
 
     assert len(self._outputs) == 1
     self.output_pcollection = list(self._outputs)[0]
@@ -591,44 +361,29 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
         self._applied_ptransform.transform.get_type_hints().input_types[0])
     self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
 
-  def process_timer(self, timer_firing):
-    # We do not need to emit a KeyedWorkItem to process_element().
-    pass
-
   def process_element(self, element):
-    assert not self.global_state.get_state(
-        None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG)
+    assert not self.state.completed
     if (isinstance(element, WindowedValue)
         and isinstance(element.value, collections.Iterable)
         and len(element.value) == 2):
       k, v = element.value
-      encoded_k = self.key_coder.encode(k)
-      state = self.step_context.get_keyed_state(encoded_k)
-      state.add_state(None, _GroupByKeyOnlyEvaluator.ELEMENTS_TAG, v)
+      self.state.output[self.key_coder.encode(k)].append(v)
     else:
       raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of '
                            'windowed key-value pairs. Instead received: %r.'
                            % element)
 
   def finish_bundle(self):
-    if self._is_final_bundle():
-      if self.global_state.get_state(
-          None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG):
+    if self._is_final_bundle:
+      if self.state.completed:
         # Ignore empty bundles after emitting output. (This may happen because
         # empty bundles do not affect input watermarks.)
         bundles = []
       else:
-        gbk_result = []
-        # TODO(ccy): perhaps we can clean this up to not use this
-        # internal attribute of the DirectStepContext.
-        for encoded_k in self.step_context.keyed_existing_state:
-          # Ignore global state.
-          if encoded_k is None:
-            continue
-          k = self.key_coder.decode(encoded_k)
-          state = self.step_context.get_keyed_state(encoded_k)
-          vs = state.get_state(None, _GroupByKeyOnlyEvaluator.ELEMENTS_TAG)
-          gbk_result.append(GlobalWindows.windowed_value((k, vs)))
+        gbk_result = (
+            map(GlobalWindows.windowed_value, (
+                (self.key_coder.decode(k), v)
+                for k, v in self.state.output.iteritems())))
 
         def len_element_fn(element):
           _, v = element.value
@@ -638,139 +393,21 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator):
             self.output_pcollection, gbk_result,
             _GroupByKeyOnlyEvaluator.MAX_ELEMENT_PER_BUNDLE, len_element_fn)
 
-      self.global_state.add_state(
-          None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG, True)
+      self.state.completed = True
+      state = self.state
       hold = WatermarkManager.WATERMARK_POS_INF
     else:
       bundles = []
+      state = self.state
       hold = WatermarkManager.WATERMARK_NEG_INF
-      self.global_state.set_timer(
-          None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
 
     return TransformResult(
-        self._applied_ptransform, bundles, [], None, {None: hold})
-
-
-class _StreamingGroupByKeyOnlyEvaluator(_TransformEvaluator):
-  """TransformEvaluator for _StreamingGroupByKeyOnly transform.
-
-  The _GroupByKeyOnlyEvaluator buffers elements until its input watermark goes
-  to infinity, which is suitable for batch mode execution. During streaming
-  mode execution, we emit each bundle as it comes to the next transform.
-  """
-
-  MAX_ELEMENT_PER_BUNDLE = None
-
-  def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs, scoped_metrics_container):
-    assert not side_inputs
-    super(_StreamingGroupByKeyOnlyEvaluator, self).__init__(
-        evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs, scoped_metrics_container)
-
-  def start_bundle(self):
-    self.gbk_items = collections.defaultdict(list)
-
-    assert len(self._outputs) == 1
-    self.output_pcollection = list(self._outputs)[0]
-
-    # The input type of a GroupByKey will be KV[Any, Any] or more specific.
-    kv_type_hint = (
-        self._applied_ptransform.transform.get_type_hints().input_types[0])
-    self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
-
-  def process_element(self, element):
-    if (isinstance(element, WindowedValue)
-        and isinstance(element.value, collections.Iterable)
-        and len(element.value) == 2):
-      k, v = element.value
-      self.gbk_items[self.key_coder.encode(k)].append(v)
-    else:
-      raise TypeCheckError('Input to _GroupByKeyOnly must be a PCollection of '
-                           'windowed key-value pairs. Instead received: %r.'
-                           % element)
-
-  def finish_bundle(self):
-    bundles = []
-    bundle = None
-    for encoded_k, vs in self.gbk_items.iteritems():
-      if not bundle:
-        bundle = self._evaluation_context.create_bundle(
-            self.output_pcollection)
-        bundles.append(bundle)
-      kwi = KeyedWorkItem(encoded_k, elements=vs)
-      bundle.add(GlobalWindows.windowed_value(kwi))
-
-    return TransformResult(
-        self._applied_ptransform, bundles, [], None, None)
-
-
-class _StreamingGroupAlsoByWindowEvaluator(_TransformEvaluator):
-  """TransformEvaluator for the _StreamingGroupAlsoByWindow transform.
-
-  This evaluator is only used in streaming mode.  In batch mode, the
-  GroupAlsoByWindow operation is evaluated as a normal DoFn, as defined
-  in transforms/core.py.
-  """
-
-  def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs, scoped_metrics_container):
-    assert not side_inputs
-    super(_StreamingGroupAlsoByWindowEvaluator, self).__init__(
-        evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs, scoped_metrics_container)
-
-  def start_bundle(self):
-    assert len(self._outputs) == 1
-    self.output_pcollection = list(self._outputs)[0]
-    self.step_context = self._execution_context.get_step_context()
-    self.driver = create_trigger_driver(
-        self._applied_ptransform.transform.windowing)
-    self.gabw_items = []
-    self.keyed_holds = {}
-
-    # The input type of a GroupAlsoByWindow will be KV[Any, Iter[Any]] or more
-    # specific.
-    kv_type_hint = (
-        self._applied_ptransform.transform.get_type_hints().input_types[0])
-    self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
-
-  def process_element(self, element):
-    kwi = element.value
-    assert isinstance(kwi, KeyedWorkItem), kwi
-    encoded_k, timer_firings, vs = (
-        kwi.encoded_key, kwi.timer_firings, kwi.elements)
-    k = self.key_coder.decode(encoded_k)
-    state = self.step_context.get_keyed_state(encoded_k)
-
-    for timer_firing in timer_firings:
-      for wvalue in self.driver.process_timer(
-          timer_firing.window, timer_firing.name, timer_firing.time_domain,
-          timer_firing.timestamp, state):
-        self.gabw_items.append(wvalue.with_value((k, wvalue.value)))
-    if vs:
-      for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
-        self.gabw_items.append(wvalue.with_value((k, wvalue.value)))
-
-    self.keyed_holds[encoded_k] = state.get_earliest_hold()
-
-  def finish_bundle(self):
-    bundles = []
-    if self.gabw_items:
-      bundle = self._evaluation_context.create_bundle(self.output_pcollection)
-      for item in self.gabw_items:
-        bundle.add(item)
-      bundles.append(bundle)
-
-    return TransformResult(
-        self._applied_ptransform, bundles, [], None, self.keyed_holds)
+        self._applied_ptransform, bundles, state, None, None, hold)
 
 
 class _NativeWriteEvaluator(_TransformEvaluator):
   """TransformEvaluator for _NativeWrite transform."""
 
-  ELEMENTS_TAG = _ListStateTag('elements')
-
   def __init__(self, evaluation_context, applied_ptransform,
                input_committed_bundle, side_inputs, scoped_metrics_container):
     assert not side_inputs
@@ -792,16 +429,12 @@ class _NativeWriteEvaluator(_TransformEvaluator):
             == WatermarkManager.WATERMARK_POS_INF)
 
   def start_bundle(self):
-    self.step_context = self._execution_context.get_step_context()
-    self.global_state = self.step_context.get_keyed_state(None)
-
-  def process_timer(self, timer_firing):
-    # We do not need to emit a KeyedWorkItem to process_element().
-    pass
+    # state: [values]
+    self.state = (self._execution_context.existing_state
+                  if self._execution_context.existing_state else [])
 
   def process_element(self, element):
-    self.global_state.add_state(
-        None, _NativeWriteEvaluator.ELEMENTS_TAG, element)
+    self.state.append(element)
 
   def finish_bundle(self):
     # finish_bundle will append incoming bundles in memory until all the bundles
@@ -811,21 +444,19 @@ class _NativeWriteEvaluator(_TransformEvaluator):
     # ignored and would not generate additional output files.
     # TODO(altay): Do not wait until the last bundle to write in a single shard.
     if self._is_final_bundle:
-      elements = self.global_state.get_state(
-          None, _NativeWriteEvaluator.ELEMENTS_TAG)
       if self._has_already_produced_output:
         # Ignore empty bundles that arrive after the output is produced.
-        assert elements == []
+        assert self.state == []
       else:
         self._sink.pipeline_options = self._evaluation_context.pipeline_options
         with self._sink.writer() as writer:
-          for v in elements:
+          for v in self.state:
             writer.Write(v.value)
+      state = None
       hold = WatermarkManager.WATERMARK_POS_INF
     else:
+      state = self.state
       hold = WatermarkManager.WATERMARK_NEG_INF
-      self.global_state.set_timer(
-          None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF)
 
     return TransformResult(
-        self._applied_ptransform, [], [], None, {None: hold})
+        self._applied_ptransform, [], state, None, None, hold)

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/transform_result.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py
new file mode 100644
index 0000000..febdd20
--- /dev/null
+++ b/sdks/python/apache_beam/runners/direct/transform_result.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""The result of evaluating an AppliedPTransform with a TransformEvaluator."""
+
+from __future__ import absolute_import
+
+
+class TransformResult(object):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  The result of evaluating an AppliedPTransform with a TransformEvaluator."""
+
+  def __init__(self, applied_ptransform, uncommitted_output_bundles, state,
+               timer_update, counters, watermark_hold,
+               undeclared_tag_values=None):
+    self.transform = applied_ptransform
+    self.uncommitted_output_bundles = uncommitted_output_bundles
+    self.state = state
+    # TODO: timer update is currently unused.
+    self.timer_update = timer_update
+    self.counters = counters
+    self.watermark_hold = watermark_hold
+    # Only used when caching (materializing) all values is requested.
+    self.undeclared_tag_values = undeclared_tag_values
+    # Populated by the TransformExecutor.
+    self.logical_metric_updates = None

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py
deleted file mode 100644
index 10f7b29..0000000
--- a/sdks/python/apache_beam/runners/direct/util.py
+++ /dev/null
@@ -1,67 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Utility classes used by the DirectRunner.
-
-For internal use only. No backwards compatibility guarantees.
-"""
-
-from __future__ import absolute_import
-
-
-class TransformResult(object):
-  """Result of evaluating an AppliedPTransform with a TransformEvaluator."""
-
-  def __init__(self, applied_ptransform, uncommitted_output_bundles,
-               unprocessed_bundles, counters, keyed_watermark_holds,
-               undeclared_tag_values=None):
-    self.transform = applied_ptransform
-    self.uncommitted_output_bundles = uncommitted_output_bundles
-    self.unprocessed_bundles = unprocessed_bundles
-    self.counters = counters
-    # Mapping of key -> earliest hold timestamp or None.  Keys should be
-    # strings or None.
-    #
-    # For each key, we receive as its corresponding value the earliest
-    # watermark hold for that key (the key can be None for global state), past
-    # which the output watermark for the currently-executing step will not
-    # advance.  If the value is None or utils.timestamp.MAX_TIMESTAMP, the
-    # watermark hold will be removed.
-    self.keyed_watermark_holds = keyed_watermark_holds or {}
-    # Only used when caching (materializing) all values is requested.
-    self.undeclared_tag_values = undeclared_tag_values
-    # Populated by the TransformExecutor.
-    self.logical_metric_updates = None
-
-
-class TimerFiring(object):
-  """A single instance of a fired timer."""
-
-  def __init__(self, encoded_key, window, name, time_domain, timestamp):
-    self.encoded_key = encoded_key
-    self.window = window
-    self.name = name
-    self.time_domain = time_domain
-    self.timestamp = timestamp
-
-
-class KeyedWorkItem(object):
-  """A keyed item that can either be a timer firing or a list of elements."""
-  def __init__(self, encoded_key, timer_firings=None, elements=None):
-    self.encoded_key = encoded_key
-    self.timer_firings = timer_firings or []
-    self.elements = elements or []

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 935998d..3a13539 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -23,10 +23,8 @@ import threading
 
 from apache_beam import pipeline
 from apache_beam import pvalue
-from apache_beam.runners.direct.util import TimerFiring
 from apache_beam.utils.timestamp import MAX_TIMESTAMP
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
-from apache_beam.utils.timestamp import TIME_GRANULARITY
 
 
 class WatermarkManager(object):
@@ -37,23 +35,21 @@ class WatermarkManager(object):
   WATERMARK_POS_INF = MAX_TIMESTAMP
   WATERMARK_NEG_INF = MIN_TIMESTAMP
 
-  def __init__(self, clock, root_transforms, value_to_consumers,
-               transform_keyed_states):
+  def __init__(self, clock, root_transforms, value_to_consumers):
     self._clock = clock  # processing time clock
-    self._root_transforms = root_transforms
     self._value_to_consumers = value_to_consumers
-    self._transform_keyed_states = transform_keyed_states
+    self._root_transforms = root_transforms
     # AppliedPTransform -> TransformWatermarks
     self._transform_to_watermarks = {}
 
     for root_transform in root_transforms:
       self._transform_to_watermarks[root_transform] = _TransformWatermarks(
-          self._clock, transform_keyed_states[root_transform], root_transform)
+          self._clock)
 
     for consumers in value_to_consumers.values():
       for consumer in consumers:
         self._transform_to_watermarks[consumer] = _TransformWatermarks(
-            self._clock, transform_keyed_states[consumer], consumer)
+            self._clock)
 
     for consumers in value_to_consumers.values():
       for consumer in consumers:
@@ -93,19 +89,16 @@ class WatermarkManager(object):
     return self._transform_to_watermarks[applied_ptransform]
 
   def update_watermarks(self, completed_committed_bundle, applied_ptransform,
-                        completed_timers, outputs, unprocessed_bundles,
-                        keyed_earliest_holds):
+                        timer_update, outputs, earliest_hold):
     assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
     self._update_pending(
-        completed_committed_bundle, applied_ptransform, completed_timers,
-        outputs, unprocessed_bundles)
+        completed_committed_bundle, applied_ptransform, timer_update, outputs)
     tw = self.get_watermarks(applied_ptransform)
-    tw.hold(keyed_earliest_holds)
+    tw.hold(earliest_hold)
     self._refresh_watermarks(applied_ptransform)
 
   def _update_pending(self, input_committed_bundle, applied_ptransform,
-                      completed_timers, output_committed_bundles,
-                      unprocessed_bundles):
+                      timer_update, output_committed_bundles):
     """Updated list of pending bundles for the given AppliedPTransform."""
 
     # Update pending elements. Filter out empty bundles. They do not impact
@@ -119,10 +112,7 @@ class WatermarkManager(object):
             consumer_tw.add_pending(output)
 
     completed_tw = self._transform_to_watermarks[applied_ptransform]
-    completed_tw.update_timers(completed_timers)
-
-    for unprocessed_bundle in unprocessed_bundles:
-      completed_tw.add_pending(unprocessed_bundle)
+    completed_tw.update_timers(timer_update)
 
     assert input_committed_bundle or applied_ptransform in self._root_transforms
     if input_committed_bundle and input_committed_bundle.has_elements():
@@ -146,36 +136,33 @@ class WatermarkManager(object):
   def extract_fired_timers(self):
     all_timers = []
     for applied_ptransform, tw in self._transform_to_watermarks.iteritems():
-      fired_timers = tw.extract_fired_timers()
-      if fired_timers:
-        all_timers.append((applied_ptransform, fired_timers))
+      if tw.extract_fired_timers():
+        all_timers.append(applied_ptransform)
     return all_timers
 
 
 class _TransformWatermarks(object):
-  """Tracks input and output watermarks for an AppliedPTransform."""
+  """Tracks input and output watermarks for aan AppliedPTransform."""
 
-  def __init__(self, clock, keyed_states, transform):
+  def __init__(self, clock):
     self._clock = clock
-    self._keyed_states = keyed_states
     self._input_transform_watermarks = []
     self._input_watermark = WatermarkManager.WATERMARK_NEG_INF
     self._output_watermark = WatermarkManager.WATERMARK_NEG_INF
-    self._keyed_earliest_holds = {}
+    self._earliest_hold = WatermarkManager.WATERMARK_POS_INF
     self._pending = set()  # Scheduled bundles targeted for this transform.
-    self._fired_timers = set()
+    self._fired_timers = False
     self._lock = threading.Lock()
 
-    self._label = str(transform)
-
   def update_input_transform_watermarks(self, input_transform_watermarks):
     with self._lock:
       self._input_transform_watermarks = input_transform_watermarks
 
-  def update_timers(self, completed_timers):
+  def update_timers(self, timer_update):
     with self._lock:
-      for timer_firing in completed_timers:
-        self._fired_timers.remove(timer_firing)
+      if timer_update:
+        assert self._fired_timers
+        self._fired_timers = False
 
   @property
   def input_watermark(self):
@@ -187,13 +174,11 @@ class _TransformWatermarks(object):
     with self._lock:
       return self._output_watermark
 
-  def hold(self, keyed_earliest_holds):
+  def hold(self, value):
     with self._lock:
-      for key, hold_value in keyed_earliest_holds.iteritems():
-        self._keyed_earliest_holds[key] = hold_value
-        if (hold_value is None or
-            hold_value == WatermarkManager.WATERMARK_POS_INF):
-          del self._keyed_earliest_holds[key]
+      if value is None:
+        value = WatermarkManager.WATERMARK_POS_INF
+      self._earliest_hold = value
 
   def add_pending(self, pending):
     with self._lock:
@@ -208,22 +193,9 @@ class _TransformWatermarks(object):
 
   def refresh(self):
     with self._lock:
-      min_pending_timestamp = WatermarkManager.WATERMARK_POS_INF
-      has_pending_elements = False
-      for input_bundle in self._pending:
-        # TODO(ccy): we can have the Bundle class keep track of the minimum
-        # timestamp so we don't have to do an iteration here.
-        for wv in input_bundle.get_elements_iterable():
-          has_pending_elements = True
-          if wv.timestamp < min_pending_timestamp:
-            min_pending_timestamp = wv.timestamp
-
-      # If there is a pending element with a certain timestamp, we can at most
-      # advance our watermark to the maximum timestamp less than that
-      # timestamp.
-      pending_holder = WatermarkManager.WATERMARK_POS_INF
-      if has_pending_elements:
-        pending_holder = min_pending_timestamp - TIME_GRANULARITY
+      pending_holder = (WatermarkManager.WATERMARK_NEG_INF
+                        if self._pending else
+                        WatermarkManager.WATERMARK_POS_INF)
 
       input_watermarks = [
           tw.output_watermark for tw in self._input_transform_watermarks]
@@ -232,11 +204,7 @@ class _TransformWatermarks(object):
 
       self._input_watermark = max(self._input_watermark,
                                   min(pending_holder, producer_watermark))
-      earliest_hold = WatermarkManager.WATERMARK_POS_INF
-      for hold in self._keyed_earliest_holds.values():
-        if hold < earliest_hold:
-          earliest_hold = hold
-      new_output_watermark = min(self._input_watermark, earliest_hold)
+      new_output_watermark = min(self._input_watermark, self._earliest_hold)
 
       advanced = new_output_watermark > self._output_watermark
       self._output_watermark = new_output_watermark
@@ -251,12 +219,8 @@ class _TransformWatermarks(object):
       if self._fired_timers:
         return False
 
-      fired_timers = []
-      for encoded_key, state in self._keyed_states.iteritems():
-        timers = state.get_timers(watermark=self._input_watermark)
-        for expired in timers:
-          window, (name, time_domain, timestamp) = expired
-          fired_timers.append(
-              TimerFiring(encoded_key, window, name, time_domain, timestamp))
-      self._fired_timers.update(fired_timers)
-      return fired_timers
+      should_fire = (
+          self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and
+          self._input_watermark == WatermarkManager.WATERMARK_POS_INF)
+      self._fired_timers = should_fire
+      return should_fire

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index a40069b..1c89d06 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -24,8 +24,7 @@ For internal use only; no backwards-compatibility guarantees.
 from apache_beam import pipeline
 from apache_beam import pvalue
 from apache_beam import coders
-from apache_beam.portability.api import beam_fn_api_pb2
-from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.api import beam_runner_api_pb2
 from apache_beam.transforms import core
 
 
@@ -43,10 +42,9 @@ class _PipelineContextMap(object):
     self._id_to_proto = proto_map if proto_map else {}
     self._counter = 0
 
-  def _unique_ref(self, obj=None):
+  def _unique_ref(self):
     self._counter += 1
-    return "ref_%s_%s_%s" % (
-        self._obj_type.__name__, type(obj).__name__, self._counter)
+    return "ref_%s_%s" % (self._obj_type.__name__, self._counter)
 
   def populate_map(self, proto_map):
     for id, proto in self._id_to_proto.items():
@@ -54,7 +52,7 @@ class _PipelineContextMap(object):
 
   def get_id(self, obj):
     if obj not in self._obj_to_id:
-      id = self._unique_ref(obj)
+      id = self._unique_ref()
       self._id_to_obj[id] = obj
       self._obj_to_id[obj] = id
       self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
@@ -81,16 +79,11 @@ class PipelineContext(object):
       # TODO: environment
   }
 
-  def __init__(self, proto=None):
-    if isinstance(proto, beam_fn_api_pb2.ProcessBundleDescriptor):
-      proto = beam_runner_api_pb2.Components(
-          coders=dict(proto.coders.items()),
-          windowing_strategies=dict(proto.windowing_strategies.items()),
-          environments=dict(proto.environments.items()))
+  def __init__(self, context_proto=None):
     for name, cls in self._COMPONENT_TYPES.items():
       setattr(
           self, name, _PipelineContextMap(
-              self, cls, getattr(proto, name, None)))
+              self, cls, getattr(context_proto, name, None)))
 
   @staticmethod
   def from_runner_api(proto):


Mime
View raw message