beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [09/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder
Date Tue, 14 Jun 2016 23:12:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/runners/dataflow_runner.py b/sdks/python/google/cloud/dataflow/runners/dataflow_runner.py
deleted file mode 100644
index 1c0c589..0000000
--- a/sdks/python/google/cloud/dataflow/runners/dataflow_runner.py
+++ /dev/null
@@ -1,639 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A runner implementation that submits a job for remote execution.
-
-The runner will create a JSON description of the job graph and then submit it
-to the Dataflow Service for remote execution by a worker.
-"""
-
-import base64
-import logging
-import threading
-import time
-
-
-from google.cloud.dataflow import coders
-from google.cloud.dataflow import pvalue
-from google.cloud.dataflow.internal import pickler
-from google.cloud.dataflow.io import iobase
-from google.cloud.dataflow.pvalue import PCollectionView
-from google.cloud.dataflow.runners.runner import PipelineResult
-from google.cloud.dataflow.runners.runner import PipelineRunner
-from google.cloud.dataflow.runners.runner import PipelineState
-from google.cloud.dataflow.runners.runner import PValueCache
-from google.cloud.dataflow.typehints import typehints
-from google.cloud.dataflow.utils import names
-from google.cloud.dataflow.utils.names import PropertyNames
-from google.cloud.dataflow.utils.names import TransformNames
-from google.cloud.dataflow.utils.options import StandardOptions
-from google.cloud.dataflow.internal.clients import dataflow as dataflow_api
-
-
-class DataflowPipelineRunner(PipelineRunner):
-  """A runner that creates job graphs and submits them for remote execution.
-
-  Every execution of the run() method will submit an independent job for
-  remote execution that consists of the nodes reachable from the passed in
-  node argument or entire graph if node is None. The run() method returns
-  after the service created the job and  will not wait for the job to finish
-  if blocking is set to False.
-  """
-
-  # Environment version information. It is passed to the service during a
-  # a job submission and is used by the service to establish what features
-  # are expected by the workers.
-  BATCH_ENVIRONMENT_MAJOR_VERSION = '4'
-  STREAMING_ENVIRONMENT_MAJOR_VERSION = '0'
-
-  def __init__(self, cache=None, blocking=False):
-    # Cache of CloudWorkflowStep protos generated while the runner
-    # "executes" a pipeline.
-    self._cache = cache if cache is not None else PValueCache()
-    self.blocking = blocking
-    self.result = None
-    self._unique_step_id = 0
-
-  def _get_unique_step_name(self):
-    self._unique_step_id += 1
-    return 's%s' % self._unique_step_id
-
-  @staticmethod
-  def poll_for_job_completion(runner, job_id):
-    """Polls for the specified job to finish running (successfully or not)."""
-    last_message_time = None
-    last_message_id = None
-
-    last_error_rank = float('-inf')
-    last_error_msg = None
-    last_job_state = None
-    # How long to wait after pipeline failure for the error
-    # message to show up giving the reason for the failure.
-    # It typically takes about 30 seconds.
-    final_countdown_timer_secs = 50.0
-    sleep_secs = 5.0
-    # Try to prioritize the user-level traceback, if any.
-    def rank_error(msg):
-      if 'work item was attempted' in msg:
-        return -1
-      elif 'Traceback' in msg:
-        return 1
-      else:
-        return 0
-
-    while True:
-      response = runner.dataflow_client.get_job(job_id)
-      # If get() is called very soon after Create() the response may not contain
-      # an initialized 'currentState' field.
-      if response.currentState is not None:
-        if response.currentState != last_job_state:
-          logging.info('Job %s is in state %s', job_id, response.currentState)
-          last_job_state = response.currentState
-        if str(response.currentState) != 'JOB_STATE_RUNNING':
-          # Stop checking for new messages on timeout, explanatory
-          # message received, success, or a terminal job state caused
-          # by the user that therefore doesn't require explanation.
-          if (final_countdown_timer_secs <= 0.0
-              or last_error_msg is not None
-              or str(response.currentState) == 'JOB_STATE_DONE'
-              or str(response.currentState) == 'JOB_STATE_CANCELLED'
-              or str(response.currentState) == 'JOB_STATE_UPDATED'
-              or str(response.currentState) == 'JOB_STATE_DRAINED'):
-            break
-          # The job has failed; ensure we see any final error messages.
-          sleep_secs = 1.0      # poll faster during the final countdown
-          final_countdown_timer_secs -= sleep_secs
-      time.sleep(sleep_secs)
-
-      # Get all messages since beginning of the job run or since last message.
-      page_token = None
-      while True:
-        messages, page_token = runner.dataflow_client.list_messages(
-            job_id, page_token=page_token, start_time=last_message_time)
-        for m in messages:
-          if last_message_id is not None and m.id == last_message_id:
-            # Skip the first message if it is the last message we got in the
-            # previous round. This can happen because we use the
-            # last_message_time as a parameter of the query for new messages.
-            continue
-          last_message_time = m.time
-          last_message_id = m.id
-          # Skip empty messages.
-          if m.messageImportance is None:
-            continue
-          logging.info(
-              '%s: %s: %s: %s', m.id, m.time, m.messageImportance,
-              m.messageText)
-          if str(m.messageImportance) == 'JOB_MESSAGE_ERROR':
-            if rank_error(m.messageText) >= last_error_rank:
-              last_error_rank = rank_error(m.messageText)
-              last_error_msg = m.messageText
-        if not page_token:
-          break
-
-    runner.result = DataflowPipelineResult(response)
-    runner.last_error_msg = last_error_msg
-
-  def run(self, pipeline):
-    """Remotely executes entire pipeline or parts reachable from node."""
-    # Import here to avoid adding the dependency for local running scenarios.
-    # pylint: disable=g-import-not-at-top
-    from google.cloud.dataflow.internal import apiclient
-    self.job = apiclient.Job(pipeline.options)
-    # The superclass's run will trigger a traversal of all reachable nodes.
-    super(DataflowPipelineRunner, self).run(pipeline)
-    # Get a Dataflow API client and submit the job.
-    standard_options = pipeline.options.view_as(StandardOptions)
-    if standard_options.streaming:
-      job_version = DataflowPipelineRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
-    else:
-      job_version = DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
-    self.dataflow_client = apiclient.DataflowApplicationClient(
-        pipeline.options, job_version)
-    self.result = DataflowPipelineResult(
-        self.dataflow_client.create_job(self.job))
-
-    if self.blocking:
-      thread = threading.Thread(
-          target=DataflowPipelineRunner.poll_for_job_completion,
-          args=(self, self.result.job_id()))
-      # Mark the thread as a daemon thread so a keyboard interrupt on the main
-      # thread will terminate everything. This is also the reason we will not
-      # use thread.join() to wait for the polling thread.
-      thread.daemon = True
-      thread.start()
-      while thread.isAlive():
-        time.sleep(5.0)
-      if self.result.current_state() != PipelineState.DONE:
-        raise DataflowRuntimeException(
-            'Dataflow pipeline failed:\n%s'
-            % getattr(self, 'last_error_msg', None), self.result)
-    return self.result
-
-  def _get_typehint_based_encoding(self, typehint, window_coder):
-    """Returns an encoding based on a typehint object."""
-    return self._get_cloud_encoding(self._get_coder(typehint,
-                                                    window_coder=window_coder))
-
-  def _get_coder(self, typehint, window_coder):
-    """Returns a coder based on a typehint object."""
-    if window_coder:
-      return coders.WindowedValueCoder(
-          coders.registry.get_coder(typehint),
-          coders.TimestampCoder(),
-          window_coder)
-    else:
-      return coders.registry.get_coder(typehint)
-
-  def _get_cloud_encoding(self, coder):
-    """Returns an encoding based on a coder object."""
-    if not isinstance(coder, coders.Coder):
-      raise TypeError('Coder object must inherit from coders.Coder: %s.' %
-                      str(coder))
-    return coder.as_cloud_object()
-
-  def _get_side_input_encoding(self, input_encoding):
-    """Returns an encoding for the output of a view transform.
-
-    Args:
-      input_encoding: encoding of current transform's input. Side inputs need
-        this because the service will check that input and output types match.
-
-    Returns:
-      An encoding that matches the output and input encoding. This is essential
-      for the View transforms introduced to produce side inputs to a ParDo.
-    """
-    return {
-        '@type': input_encoding['@type'],
-        'component_encodings': [input_encoding]
-    }
-
-  def _get_encoded_output_coder(self, transform_node, window_value=True):
-    """Returns the cloud encoding of the coder for the output of a transform."""
-    if (len(transform_node.outputs) == 1
-        and transform_node.outputs[0].element_type is not None):
-      # TODO(robertwb): Handle type hints for multi-output transforms.
-      element_type = transform_node.outputs[0].element_type
-    else:
-      # TODO(silviuc): Remove this branch (and assert) when typehints are
-      # propagated everywhere. Returning an 'Any' as type hint will trigger
-      # usage of the fallback coder (i.e., cPickler).
-      element_type = typehints.Any
-    if window_value:
-      window_coder = (
-          transform_node.outputs[0].windowing.windowfn.get_window_coder())
-    else:
-      window_coder = None
-    return self._get_typehint_based_encoding(
-        element_type, window_coder=window_coder)
-
-  def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
-    """Creates a Step object and adds it to the cache."""
-    # Import here to avoid adding the dependency for local running scenarios.
-    # pylint: disable=g-import-not-at-top
-    from google.cloud.dataflow.internal import apiclient
-    step = apiclient.Step(step_kind, self._get_unique_step_name())
-    self.job.proto.steps.append(step.proto)
-    step.add_property(PropertyNames.USER_NAME, step_label)
-    # Cache the node/step association for the main output of the transform node.
-    self._cache.cache_output(transform_node, None, step)
-    # If side_tags is not () then this is a multi-output transform node and we
-    # need to cache the (node, tag, step) for each of the tags used to access
-    # the outputs. This is essential because the keys used to search in the
-    # cache always contain the tag.
-    for tag in side_tags:
-      self._cache.cache_output(transform_node, tag, step)
-    return step
-
-  def run_Create(self, transform_node):
-    transform = transform_node.transform
-    step = self._add_step(TransformNames.CREATE_PCOLLECTION,
-                          transform_node.full_label, transform_node)
-    # TODO(silviuc): Eventually use a coder based on typecoders.
-    # Note that we base64-encode values here so that the service will accept
-    # the values.
-    element_coder = coders.PickleCoder()
-    step.add_property(
-        PropertyNames.ELEMENT,
-        [base64.b64encode(element_coder.encode(v))
-         for v in transform.value])
-    # The service expects a WindowedValueCoder here, so we wrap the actual
-    # encoding in a WindowedValueCoder.
-    step.encoding = self._get_cloud_encoding(
-        coders.WindowedValueCoder(element_coder))
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-          PropertyNames.ENCODING: step.encoding,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-
-  def run_CreatePCollectionView(self, transform_node):
-    step = self._add_step(TransformNames.COLLECTION_TO_SINGLETON,
-                          transform_node.full_label, transform_node)
-    input_tag = transform_node.inputs[0].tag
-    input_step = self._cache.get_pvalue(transform_node.inputs[0])
-    step.add_property(
-        PropertyNames.PARALLEL_INPUT,
-        {'@type': 'OutputReference',
-         PropertyNames.STEP_NAME: input_step.proto.name,
-         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
-    step.encoding = self._get_side_input_encoding(input_step.encoding)
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-          PropertyNames.ENCODING: step.encoding,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-
-  def run_Flatten(self, transform_node):
-    step = self._add_step(TransformNames.FLATTEN,
-                          transform_node.full_label, transform_node)
-    inputs = []
-    for one_input in transform_node.inputs:
-      input_step = self._cache.get_pvalue(one_input)
-      inputs.append(
-          {'@type': 'OutputReference',
-           PropertyNames.STEP_NAME: input_step.proto.name,
-           PropertyNames.OUTPUT_NAME: input_step.get_output(one_input.tag)})
-    step.add_property(PropertyNames.INPUTS, inputs)
-    step.encoding = self._get_encoded_output_coder(transform_node)
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-          PropertyNames.ENCODING: step.encoding,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-
-  def apply_GroupByKey(self, transform, pcoll):
-    coder = self._get_coder(pcoll.element_type or typehints.Any, None)
-    if not coder.is_kv_coder():
-      raise ValueError(('Coder for the GroupByKey operation "%s" is not a '
-                        'key-value coder: %s.') % (transform.label,
-                                                   coder))
-    # TODO(robertwb): Update the coder itself if it changed.
-    coders.registry.verify_deterministic(
-        coder.key_coder(), 'GroupByKey operation "%s"' % transform.label)
-
-    return pvalue.PCollection(pcoll.pipeline)
-
-  def run_GroupByKey(self, transform_node):
-    input_tag = transform_node.inputs[0].tag
-    input_step = self._cache.get_pvalue(transform_node.inputs[0])
-    step = self._add_step(
-        TransformNames.GROUP, transform_node.full_label, transform_node)
-    step.add_property(
-        PropertyNames.PARALLEL_INPUT,
-        {'@type': 'OutputReference',
-         PropertyNames.STEP_NAME: input_step.proto.name,
-         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
-    step.encoding = self._get_encoded_output_coder(transform_node)
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-          PropertyNames.ENCODING: step.encoding,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-    windowing = transform_node.transform.get_windowing(
-        transform_node.inputs)
-    step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing))
-
-  def run_ParDo(self, transform_node):
-    transform = transform_node.transform
-    input_tag = transform_node.inputs[0].tag
-    input_step = self._cache.get_pvalue(transform_node.inputs[0])
-
-    # Attach side inputs.
-    si_dict = {}
-    si_tags_and_types = []
-    for side_pval in transform_node.side_inputs:
-      assert isinstance(side_pval, PCollectionView)
-      side_input_step = self._cache.get_pvalue(side_pval)
-      si_label = side_input_step.step_name
-      si_dict[si_label] = {
-          '@type': 'OutputReference',
-          PropertyNames.STEP_NAME: si_label,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}
-      # The label for the side input step will appear as a 'tag' property for
-      # the side input source specification. Its type (singleton or iterator)
-      # will also be used to read the entire source or just first element.
-      si_tags_and_types.append((si_label, side_pval.__class__,
-                                side_pval._view_options()))  # pylint: disable=protected-access
-
-    # Now create the step for the ParDo transform being handled.
-    step = self._add_step(
-        TransformNames.DO, transform_node.full_label, transform_node,
-        transform_node.transform.side_output_tags)
-    fn_data = (transform.fn, transform.args, transform.kwargs,
-               si_tags_and_types, transform_node.inputs[0].windowing)
-    step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data))
-    step.add_property(
-        PropertyNames.PARALLEL_INPUT,
-        {'@type': 'OutputReference',
-         PropertyNames.STEP_NAME: input_step.proto.name,
-         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
-    # Add side inputs if any.
-    step.add_property(PropertyNames.NON_PARALLEL_INPUTS, si_dict)
-
-    # Generate description for main output and side outputs. The output names
-    # will be 'out' for main output and 'out_<tag>' for a tagged output.
-    # Using 'out' as a tag will not clash with the name for main since it will
-    # be transformed into 'out_out' internally.
-    outputs = []
-    step.encoding = self._get_encoded_output_coder(transform_node)
-
-    # Add the main output to the description.
-    outputs.append(
-        {PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-         PropertyNames.ENCODING: step.encoding,
-         PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
-    for side_tag in transform.side_output_tags:
-      # The assumption here is that side outputs will have the same typehint
-      # and coder as the main output. This is certainly the case right now
-      # but conceivably it could change in the future.
-      outputs.append(
-          {PropertyNames.USER_NAME: (
-              '%s.%s' % (transform_node.full_label, side_tag)),
-           PropertyNames.ENCODING: step.encoding,
-           PropertyNames.OUTPUT_NAME: (
-               '%s_%s' % (PropertyNames.OUT, side_tag))})
-    step.add_property(PropertyNames.OUTPUT_INFO, outputs)
-
-  def apply_CombineValues(self, transform, pcoll):
-    return pvalue.PCollection(pcoll.pipeline)
-
-  def run_CombineValues(self, transform_node):
-    transform = transform_node.transform
-    input_tag = transform_node.inputs[0].tag
-    input_step = self._cache.get_pvalue(transform_node.inputs[0])
-    step = self._add_step(
-        TransformNames.COMBINE, transform_node.full_label, transform_node)
-    # Combiner functions do not take deferred side-inputs (i.e. PValues) and
-    # therefore the code to handle extra args/kwargs is simpler than for the
-    # DoFn's of the ParDo transform. In the last, empty argument is where
-    # side inputs information would go.
-    fn_data = (transform.fn, transform.args, transform.kwargs, ())
-    step.add_property(PropertyNames.SERIALIZED_FN,
-                      pickler.dumps(fn_data))
-    step.add_property(
-        PropertyNames.PARALLEL_INPUT,
-        {'@type': 'OutputReference',
-         PropertyNames.STEP_NAME: input_step.proto.name,
-         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
-    # Note that the accumulator must not have a WindowedValue encoding, while
-    # the output of this step does in fact have a WindowedValue encoding.
-    accumulator_encoding = self._get_encoded_output_coder(transform_node,
-                                                          window_value=False)
-    output_encoding = self._get_encoded_output_coder(transform_node)
-
-    step.encoding = output_encoding
-    step.add_property(PropertyNames.ENCODING, accumulator_encoding)
-    # Generate description for main output 'out.'
-    outputs = []
-    # Add the main output to the description.
-    outputs.append(
-        {PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-         PropertyNames.ENCODING: step.encoding,
-         PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
-    step.add_property(PropertyNames.OUTPUT_INFO, outputs)
-
-  def run_Read(self, transform_node):
-    transform = transform_node.transform
-    step = self._add_step(
-        TransformNames.READ, transform_node.full_label, transform_node)
-    # TODO(mairbek): refactor if-else tree to use registerable functions.
-    # Initialize the source specific properties.
-
-    if not hasattr(transform.source, 'format'):
-      # If a format is not set, we assume the source to be a custom source.
-      source_dict = dict()
-      spec_dict = dict()
-
-      spec_dict[names.SERIALIZED_SOURCE_KEY] = pickler.dumps(transform.source)
-      spec_dict['@type'] = names.SOURCE_TYPE
-      source_dict['spec'] = spec_dict
-      step.add_property(PropertyNames.SOURCE_STEP_INPUT,
-                        source_dict)
-    elif transform.source.format == 'text':
-      step.add_property(PropertyNames.FILE_PATTERN, transform.source.path)
-    elif transform.source.format == 'bigquery':
-      # TODO(silviuc): Add table validation if transform.source.validate.
-      if transform.source.table_reference is not None:
-        step.add_property(PropertyNames.BIGQUERY_DATASET,
-                          transform.source.table_reference.datasetId)
-        step.add_property(PropertyNames.BIGQUERY_TABLE,
-                          transform.source.table_reference.tableId)
-        # If project owning the table was not specified then the project owning
-        # the workflow (current project) will be used.
-        if transform.source.table_reference.projectId is not None:
-          step.add_property(PropertyNames.BIGQUERY_PROJECT,
-                            transform.source.table_reference.projectId)
-      elif transform.source.query is not None:
-        step.add_property(PropertyNames.BIGQUERY_QUERY, transform.source.query)
-      else:
-        raise ValueError('BigQuery source %r must specify either a table or'
-                         ' a query',
-                         transform.source)
-    elif transform.source.format == 'pubsub':
-      standard_options = (
-          transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
-      if not standard_options.streaming:
-        raise ValueError('PubSubSource is currently available for use only in '
-                         'streaming pipelines.')
-      step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic)
-      if transform.source.subscription:
-        step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION,
-                          transform.source.topic)
-      if transform.source.id_label:
-        step.add_property(PropertyNames.PUBSUB_ID_LABEL,
-                          transform.source.id_label)
-    else:
-      raise ValueError(
-          'Source %r has unexpected format %s.' % (
-              transform.source, transform.source.format))
-
-    if not hasattr(transform.source, 'format'):
-      step.add_property(PropertyNames.FORMAT, names.SOURCE_FORMAT)
-    else:
-      step.add_property(PropertyNames.FORMAT, transform.source.format)
-
-    if isinstance(transform.source, iobase.BoundedSource):
-      coder = transform.source.default_output_coder()
-    else:
-      coder = transform.source.coder
-
-    step.encoding = self._get_cloud_encoding(coder)
-    step.add_property(
-        PropertyNames.OUTPUT_INFO,
-        [{PropertyNames.USER_NAME: (
-            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
-          PropertyNames.ENCODING: step.encoding,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
-
-  def run__NativeWrite(self, transform_node):
-    transform = transform_node.transform
-    input_tag = transform_node.inputs[0].tag
-    input_step = self._cache.get_pvalue(transform_node.inputs[0])
-    step = self._add_step(
-        TransformNames.WRITE, transform_node.full_label, transform_node)
-    # TODO(mairbek): refactor if-else tree to use registerable functions.
-    # Initialize the sink specific properties.
-    if transform.sink.format == 'text':
-      # Note that it is important to use typed properties (@type/value dicts)
-      # for non-string properties and also for empty strings. For example,
-      # in the code below the num_shards must have type and also
-      # file_name_suffix and shard_name_template (could be empty strings).
-      step.add_property(
-          PropertyNames.FILE_NAME_PREFIX, transform.sink.file_name_prefix,
-          with_type=True)
-      step.add_property(
-          PropertyNames.FILE_NAME_SUFFIX, transform.sink.file_name_suffix,
-          with_type=True)
-      step.add_property(
-          PropertyNames.SHARD_NAME_TEMPLATE, transform.sink.shard_name_template,
-          with_type=True)
-      if transform.sink.num_shards > 0:
-        step.add_property(
-            PropertyNames.NUM_SHARDS, transform.sink.num_shards, with_type=True)
-      # TODO(silviuc): Implement sink validation.
-      step.add_property(PropertyNames.VALIDATE_SINK, False, with_type=True)
-    elif transform.sink.format == 'bigquery':
-      # TODO(silviuc): Add table validation if transform.sink.validate.
-      step.add_property(PropertyNames.BIGQUERY_DATASET,
-                        transform.sink.table_reference.datasetId)
-      step.add_property(PropertyNames.BIGQUERY_TABLE,
-                        transform.sink.table_reference.tableId)
-      # If project owning the table was not specified then the project owning
-      # the workflow (current project) will be used.
-      if transform.sink.table_reference.projectId is not None:
-        step.add_property(PropertyNames.BIGQUERY_PROJECT,
-                          transform.sink.table_reference.projectId)
-      step.add_property(PropertyNames.BIGQUERY_CREATE_DISPOSITION,
-                        transform.sink.create_disposition)
-      step.add_property(PropertyNames.BIGQUERY_WRITE_DISPOSITION,
-                        transform.sink.write_disposition)
-      if transform.sink.table_schema is not None:
-        step.add_property(
-            PropertyNames.BIGQUERY_SCHEMA, transform.sink.schema_as_json())
-    elif transform.sink.format == 'pubsub':
-      standard_options = (
-          transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
-      if not standard_options.streaming:
-        raise ValueError('PubSubSink is currently available for use only in '
-                         'streaming pipelines.')
-      step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.topic)
-    else:
-      raise ValueError(
-          'Sink %r has unexpected format %s.' % (
-              transform.sink, transform.sink.format))
-    step.add_property(PropertyNames.FORMAT, transform.sink.format)
-    step.encoding = self._get_cloud_encoding(transform.sink.coder)
-    step.add_property(PropertyNames.ENCODING, step.encoding)
-    step.add_property(
-        PropertyNames.PARALLEL_INPUT,
-        {'@type': 'OutputReference',
-         PropertyNames.STEP_NAME: input_step.proto.name,
-         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
-
-
-class DataflowPipelineResult(PipelineResult):
-  """Represents the state of a pipeline run on the Dataflow service."""
-
-  def __init__(self, job):
-    """Job is a Job message from the Dataflow API."""
-    self._job = job
-
-  def job_id(self):
-    return self._job.id
-
-  def current_state(self):
-    """Return the current state of the remote job.
-
-    Returns:
-      A PipelineState object.
-    """
-    values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
-    api_jobstate_map = {
-        values_enum.JOB_STATE_UNKNOWN: PipelineState.UNKNOWN,
-        values_enum.JOB_STATE_STOPPED: PipelineState.STOPPED,
-        values_enum.JOB_STATE_RUNNING: PipelineState.RUNNING,
-        values_enum.JOB_STATE_DONE: PipelineState.DONE,
-        values_enum.JOB_STATE_FAILED: PipelineState.FAILED,
-        values_enum.JOB_STATE_CANCELLED: PipelineState.CANCELLED,
-        values_enum.JOB_STATE_UPDATED: PipelineState.UPDATED,
-        values_enum.JOB_STATE_DRAINING: PipelineState.DRAINING,
-        values_enum.JOB_STATE_DRAINED: PipelineState.DRAINED,
-    }
-
-    return (api_jobstate_map[self._job.currentState] if self._job.currentState
-            else PipelineState.UNKNOWN)
-
-  def __str__(self):
-    return '<%s %s %s>' % (
-        self.__class__.__name__,
-        self.job_id(),
-        self.current_state())
-
-  def __repr__(self):
-    return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))
-
-
-class DataflowRuntimeException(Exception):
-  """Indicates an error has occurred in running this pipeline."""
-
-  def __init__(self, msg, result):
-    super(DataflowRuntimeException, self).__init__(msg)
-    self.result = result

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/runners/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/runners/direct_runner.py b/sdks/python/google/cloud/dataflow/runners/direct_runner.py
deleted file mode 100644
index 97de2c5..0000000
--- a/sdks/python/google/cloud/dataflow/runners/direct_runner.py
+++ /dev/null
@@ -1,326 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""DirectPipelineRunner, executing on the local machine.
-
-The DirectPipelineRunner class implements what is called in Dataflow
-parlance the "direct runner". Such a runner executes the entire graph
-of transformations belonging to a pipeline on the local machine.
-"""
-
-from __future__ import absolute_import
-
-import collections
-import itertools
-import logging
-
-from google.cloud.dataflow import coders
-from google.cloud.dataflow import error
-from google.cloud.dataflow.io import fileio
-from google.cloud.dataflow.io import iobase
-from google.cloud.dataflow.pvalue import DictPCollectionView
-from google.cloud.dataflow.pvalue import EmptySideInput
-from google.cloud.dataflow.pvalue import IterablePCollectionView
-from google.cloud.dataflow.pvalue import ListPCollectionView
-from google.cloud.dataflow.pvalue import SingletonPCollectionView
-from google.cloud.dataflow.runners.common import DoFnRunner
-from google.cloud.dataflow.runners.common import DoFnState
-from google.cloud.dataflow.runners.runner import PipelineResult
-from google.cloud.dataflow.runners.runner import PipelineRunner
-from google.cloud.dataflow.runners.runner import PipelineState
-from google.cloud.dataflow.runners.runner import PValueCache
-from google.cloud.dataflow.transforms import DoFnProcessContext
-from google.cloud.dataflow.transforms.window import GlobalWindows
-from google.cloud.dataflow.transforms.window import WindowedValue
-from google.cloud.dataflow.typehints.typecheck import OutputCheckWrapperDoFn
-from google.cloud.dataflow.typehints.typecheck import TypeCheckError
-from google.cloud.dataflow.typehints.typecheck import TypeCheckWrapperDoFn
-from google.cloud.dataflow.utils import counters
-from google.cloud.dataflow.utils.options import TypeOptions
-
-
-class DirectPipelineRunner(PipelineRunner):
-  """A local pipeline runner.
-
-  The runner computes everything locally and does not make any attempt to
-  optimize for time or space.
-  """
-
-  def __init__(self, cache=None):
-    # Cache of values computed while the runner executes a pipeline.
-    self._cache = cache if cache is not None else PValueCache()
-    self._counter_factory = counters.CounterFactory()
-    # Element counts used for debugging footprint issues in the direct runner.
-    # The values computed are used only for logging and do not take part in
-    # any decision making logic. The key for the counter dictionary is either
-    # the full label for the transform producing the elements or a tuple
-    # (full label, output tag) for ParDo transforms since they can output values
-    # on multiple outputs.
-    self.debug_counters = {}
-    self.debug_counters['element_counts'] = collections.Counter()
-
-  @property
-  def cache(self):
-    return self._cache
-
-  def get_pvalue(self, pvalue):
-    """Gets the PValue's computed value from the runner's cache."""
-    try:
-      return self._cache.get_pvalue(pvalue)
-    except KeyError:
-      raise error.PValueError('PValue is not computed.')
-
-  def clear_pvalue(self, pvalue):
-    """Removes a PValue from the runner's cache."""
-    self._cache.clear_pvalue(pvalue)
-
-  def skip_if_cached(func):  # pylint: disable=no-self-argument
-    """Decorator to skip execution of a transform if value is cached."""
-
-    def func_wrapper(self, pvalue, *args, **kwargs):
-      logging.debug('Current: Debug counters: %s', self.debug_counters)
-      if self._cache.is_cached(pvalue):  # pylint: disable=protected-access
-        return
-      else:
-        func(self, pvalue, *args, **kwargs)
-    return func_wrapper
-
-  def run(self, pipeline):
-    super(DirectPipelineRunner, self).run(pipeline)
-    logging.info('Final: Debug counters: %s', self.debug_counters)
-    return DirectPipelineResult(state=PipelineState.DONE,
-                                counter_factory=self._counter_factory)
-
-  @skip_if_cached
-  def run_CreatePCollectionView(self, transform_node):
-    transform = transform_node.transform
-    view = transform.view
-    values = self._cache.get_pvalue(transform_node.inputs[0])
-    if isinstance(view, SingletonPCollectionView):
-      has_default, default_value = view._view_options()  # pylint: disable=protected-access
-      if len(values) == 0:  # pylint: disable=g-explicit-length-test
-        if has_default:
-          result = default_value
-        else:
-          result = EmptySideInput()
-      elif len(values) == 1:
-        # TODO(ccy): Figure out whether side inputs should ever be given as
-        # windowed values
-        result = values[0].value
-      else:
-        raise ValueError(('PCollection with more than one element accessed as '
-                          'a singleton view: %s.') % view)
-    elif isinstance(view, IterablePCollectionView):
-      result = [v.value for v in values]
-    elif isinstance(view, ListPCollectionView):
-      result = [v.value for v in values]
-    elif isinstance(view, DictPCollectionView):
-      result = dict(v.value for v in values)
-    else:
-      raise NotImplementedError
-
-    self._cache.cache_output(transform_node, result)
-
-  @skip_if_cached
-  def run_ParDo(self, transform_node):
-    transform = transform_node.transform
-    # TODO(gildea): what is the appropriate object to attach the state to?
-    context = DoFnProcessContext(label=transform.label,
-                                 state=DoFnState(self._counter_factory))
-
-    side_inputs = [self._cache.get_pvalue(view)
-                   for view in transform_node.side_inputs]
-
-    # TODO(robertwb): Do this type checking inside DoFnRunner to get it on
-    # remote workers as well?
-    options = transform_node.inputs[0].pipeline.options
-    if options is not None and options.view_as(TypeOptions).runtime_type_check:
-      transform.dofn = TypeCheckWrapperDoFn(
-          transform.dofn, transform.get_type_hints())
-
-    # TODO(robertwb): Should this be conditionally done on the workers as well?
-    transform.dofn = OutputCheckWrapperDoFn(
-        transform.dofn, transform_node.full_label)
-
-    class RecordingReceiverSet(object):
-
-      def __init__(self, tag):
-        self.tag = tag
-
-      def output(self, element):
-        results[self.tag].append(element)
-
-    class TaggedReceivers(dict):
-
-      def __missing__(self, key):
-        return RecordingReceiverSet(key)
-
-    results = collections.defaultdict(list)
-    # Some tags may be empty.
-    for tag in transform.side_output_tags:
-      results[tag] = []
-
-    runner = DoFnRunner(transform.dofn, transform.args, transform.kwargs,
-                        side_inputs, transform_node.inputs[0].windowing,
-                        context, TaggedReceivers(),
-                        step_name=transform_node.full_label)
-    runner.start()
-    for v in self._cache.get_pvalue(transform_node.inputs[0]):
-      runner.process(v)
-    runner.finish()
-
-    self._cache.cache_output(transform_node, [])
-    for tag, value in results.items():
-      self.debug_counters['element_counts'][
-          (transform_node.full_label, tag)] += len(value)
-      self._cache.cache_output(transform_node, tag, value)
-
-  @skip_if_cached
-  def run_GroupByKeyOnly(self, transform_node):
-    result_dict = collections.defaultdict(list)
-    # The input type of a GroupByKey will be KV[Any, Any] or more specific.
-    kv_type_hint = transform_node.transform.get_type_hints().input_types[0]
-    key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0])
-
-    for wv in self._cache.get_pvalue(transform_node.inputs[0]):
-      if (isinstance(wv, WindowedValue) and
-          isinstance(wv.value, collections.Iterable) and len(wv.value) == 2):
-        k, v = wv.value
-        # We use as key a string encoding of the key object to support keys
-        # that are based on custom classes. This mimics also the remote
-        # execution behavior where key objects are encoded before being written
-        # to the shuffler system responsible for grouping.
-        result_dict[key_coder.encode(k)].append(v)
-      else:
-        raise TypeCheckError('Input to GroupByKeyOnly must be a PCollection of '
-                             'windowed key-value pairs. Instead received: %r.'
-                             % wv)
-
-    gbk_result = map(
-        GlobalWindows.windowed_value,
-        ((key_coder.decode(k), v) for k, v in result_dict.iteritems()))
-    self.debug_counters['element_counts'][
-        transform_node.full_label] += len(gbk_result)
-    self._cache.cache_output(transform_node, gbk_result)
-
-  @skip_if_cached
-  def run_Create(self, transform_node):
-    transform = transform_node.transform
-    create_result = [GlobalWindows.windowed_value(v) for v in transform.value]
-    self.debug_counters['element_counts'][
-        transform_node.full_label] += len(create_result)
-    self._cache.cache_output(transform_node, create_result)
-
-  @skip_if_cached
-  def run_Flatten(self, transform_node):
-    flatten_result = list(
-        itertools.chain.from_iterable(
-            self._cache.get_pvalue(pc) for pc in transform_node.inputs))
-    self.debug_counters['element_counts'][
-        transform_node.full_label] += len(flatten_result)
-    self._cache.cache_output(transform_node, flatten_result)
-
-  @skip_if_cached
-  def run_Read(self, transform_node):
-    # TODO(chamikara) Implement a more generic way for passing PipelineOptions
-    # to sources and sinks when using DirectRunner.
-    source = transform_node.transform.source
-    source.pipeline_options = transform_node.inputs[0].pipeline.options
-
-    def read_values(reader):
-      read_result = [GlobalWindows.windowed_value(e) for e in reader]
-      self.debug_counters['element_counts'][
-          transform_node.full_label] += len(read_result)
-      self._cache.cache_output(transform_node, read_result)
-
-    if isinstance(source, iobase.BoundedSource):
-      reader = source.read(None)
-      read_values(reader)
-    else:
-      with source.reader() as reader:
-        read_values(reader)
-
-  @skip_if_cached
-  def run__NativeWrite(self, transform_node):
-    sink = transform_node.transform.sink
-    if isinstance(sink, fileio.NativeTextFileSink):
-      assert sink.num_shards in (0, 1)
-      if sink.shard_name_template:
-        sink.file_path += '-00000-of-00001'
-      sink.file_path += sink.file_name_suffix
-    sink.pipeline_options = transform_node.inputs[0].pipeline.options
-    with sink.writer() as writer:
-      for v in self._cache.get_pvalue(transform_node.inputs[0]):
-        self.debug_counters['element_counts'][transform_node.full_label] += 1
-        writer.Write(v.value)
-
-
-class DirectPipelineResult(PipelineResult):
-  """A DirectPipelineResult provides access to info about a pipeline."""
-
-  def __init__(self, state, counter_factory=None):
-    super(DirectPipelineResult, self).__init__(state)
-    self._counter_factory = counter_factory
-
-  def aggregated_values(self, aggregator_or_name):
-    return self._counter_factory.get_aggregator_values(aggregator_or_name)
-
-
-class EagerPipelineRunner(DirectPipelineRunner):
-
-  is_eager = True
-
-  def __init__(self):
-    super(EagerPipelineRunner, self).__init__()
-    self._seen_transforms = set()
-
-  def run_transform(self, transform):
-    if transform not in self._seen_transforms:
-      self._seen_transforms.add(transform)
-      super(EagerPipelineRunner, self).run_transform(transform)
-
-
-class DiskCachedPipelineRunner(DirectPipelineRunner):
-  """A DirectPipelineRunner that uses a disk backed cache.
-
-  DiskCachedPipelineRunner uses a temporary disk backed cache for running
-  pipelines. This allows for running pipelines that will require more memory
-  than it is available, however this comes with a performance cost due to disk
-  IO.
-
-  Memory requirement for DiskCachedPipelineRunner is approximately capped by the
-  single transform in the pipeline that consumes and outputs largest total
-  collection (i.e. inputs, side-inputs and outputs in aggregate). In the extreme
-  case a where a transform will use all previous intermediate values as input,
-  memory requirements for DiskCachedPipelineRunner will be the same as
-  DirectPipelineRunner.
-  """
-
-  def __init__(self):
-    self._null_cache = ()
-    super(DiskCachedPipelineRunner, self).__init__(self._null_cache)
-
-  def run(self, pipeline):
-    try:
-      self._cache = PValueCache(use_disk_backed_cache=True)
-      return super(DirectPipelineRunner, self).run(pipeline)
-    finally:
-      del self._cache
-      self._cache = self._null_cache
-
-  @property
-  def cache(self):
-    raise NotImplementedError(
-        'DiskCachedPipelineRunner does not keep cache outside the scope of its '
-        'run method.')

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/runners/runner.py b/sdks/python/google/cloud/dataflow/runners/runner.py
deleted file mode 100644
index 6d171be..0000000
--- a/sdks/python/google/cloud/dataflow/runners/runner.py
+++ /dev/null
@@ -1,305 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""PipelineRunner, an abstract base runner object."""
-
-from __future__ import absolute_import
-
-import logging
-import os
-import shelve
-import shutil
-import tempfile
-
-
-def create_runner(runner_name):
-  """Creates a runner instance from a runner class name.
-
-  Args:
-    runner_name: Name of the pipeline runner. Possible values are:
-      DirectPipelineRunner, DataflowPipelineRunner and
-      BlockingDataflowPipelineRunner.
-
-  Returns:
-    A runner object.
-
-  Raises:
-    RuntimeError: if an invalid runner name is used.
-  """
-  # pylint: disable=g-import-not-at-top
-  if runner_name == 'DirectPipelineRunner':
-    import google.cloud.dataflow.runners.direct_runner
-    return google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner()
-  if runner_name == 'DiskCachedPipelineRunner':
-    import google.cloud.dataflow.runners.direct_runner
-    return google.cloud.dataflow.runners.direct_runner.DiskCachedPipelineRunner(
-    )
-  if runner_name == 'EagerPipelineRunner':
-    import google.cloud.dataflow.runners.direct_runner
-    return google.cloud.dataflow.runners.direct_runner.EagerPipelineRunner()
-  elif runner_name in ('DataflowPipelineRunner',
-                       'BlockingDataflowPipelineRunner'):
-    import google.cloud.dataflow.runners.dataflow_runner
-    return google.cloud.dataflow.runners.dataflow_runner.DataflowPipelineRunner(
-        blocking=runner_name == 'BlockingDataflowPipelineRunner')
-  else:
-    raise RuntimeError(
-        'Unexpected pipeline runner: %s. Valid values are '
-        'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, or '
-        'BlockingDataflowPipelineRunner.' % runner_name)
-
-
-class PipelineRunner(object):
-  """A runner of a pipeline object.
-
-  The base runner provides a run() method for visiting every node in the
-  pipeline's DAG and executing the transforms computing the PValue in the node.
-  It also provides a clear() method for visiting every node and clearing out
-  the values contained in PValue objects produced during a run.
-
-  A custom runner will typically provide implementations for some of the
-  transform methods (ParDo, GroupByKey, Create, etc.). It may also
-  provide a new implementation for clear_pvalue(), which is used to wipe out
-  materialized values in order to reduce footprint.
-  """
-
-  def run(self, pipeline):
-    """Execute the entire pipeline or the sub-DAG reachable from a node."""
-
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=g-import-not-at-top
-    from google.cloud.dataflow.pipeline import PipelineVisitor
-
-    class RunVisitor(PipelineVisitor):
-
-      def __init__(self, runner):
-        self.runner = runner
-
-      def visit_transform(self, transform_node):
-        try:
-          self.runner.run_transform(transform_node)
-        except:
-          logging.error('Error while visiting %s', transform_node.full_label)
-          raise
-
-    pipeline.visit(RunVisitor(self))
-
-  def clear(self, pipeline, node=None):
-    """Clear all nodes or nodes reachable from node of materialized values.
-
-    Args:
-      pipeline: Pipeline object containing PValues to be cleared.
-      node: Optional node in the Pipeline processing DAG. If specified only
-        nodes reachable from this node will be cleared (ancestors of the node).
-
-    This method is not intended (for now) to be called by users of Runner
-    objects. It is a hook for future layers on top of the current programming
-    model to control how much of the previously computed values are kept
-    around. Presumably an interactivity layer will use it. The simplest way
-    to change the behavior would be to define a runner that overwrites the
-    clear_pvalue() method since this method (runner.clear) will visit all
-    relevant nodes and call clear_pvalue on them.
-
-    """
-
-    # Imported here to avoid circular dependencies.
-    # pylint: disable=g-import-not-at-top
-    from google.cloud.dataflow.pipeline import PipelineVisitor
-
-    class ClearVisitor(PipelineVisitor):
-
-      def __init__(self, runner):
-        self.runner = runner
-
-      def visit_value(self, value, _):
-        self.runner.clear_pvalue(value)
-
-    pipeline.visit(ClearVisitor(self), node=node)
-
-  def apply(self, transform, input):
-    """Runner callback for a pipeline.apply call.
-
-    Args:
-      transform: the transform to apply.
-      input: transform's input (typically a PCollection).
-
-    A concrete implementation of the Runner class may want to do custom
-    pipeline construction for a given transform.  To override the behavior
-    for a transform class Xyz, implement an apply_Xyz method with this same
-    signature.
-    """
-    for cls in transform.__class__.mro():
-      m = getattr(self, 'apply_%s' % cls.__name__, None)
-      if m:
-        return m(transform, input)
-    raise NotImplementedError(
-        'Execution of [%s] not implemented in runner %s.' % (transform, self))
-
-  def apply_PTransform(self, transform, input):
-    # The base case of apply is to call the transform's apply.
-    return transform.apply(input)
-
-  def run_transform(self, transform_node):
-    """Runner callback for a pipeline.run call.
-
-    Args:
-      transform_node: transform node for the transform to run.
-
-    A concrete implementation of the Runner class must implement run_Abc for
-    some class Abc in the method resolution order for every non-composite
-    transform Xyz in the pipeline.
-    """
-    for cls in transform_node.transform.__class__.mro():
-      m = getattr(self, 'run_%s' % cls.__name__, None)
-      if m:
-        return m(transform_node)
-    raise NotImplementedError(
-        'Execution of [%s] not implemented in runner %s.' % (
-            transform_node.transform, self))
-
-
-class PValueCache(object):
-  """Local cache for arbitrary information computed for PValue objects."""
-
-  def __init__(self, use_disk_backed_cache=False):
-    # Cache of values computed while a runner executes a pipeline. This is a
-    # dictionary of PValues and their computed values. Note that in principle
-    # the runner could contain PValues from several pipelines without clashes
-    # since a PValue is associated with one and only one pipeline. The keys of
-    # the dictionary are tuple of PValue instance addresses obtained using id()
-    # and tag names converted to strings.
-
-    self._use_disk_backed_cache = use_disk_backed_cache
-    if use_disk_backed_cache:
-      self._tempdir = tempfile.mkdtemp()
-      self._cache = shelve.open(os.path.join(self._tempdir, 'shelve'))
-    else:
-      self._cache = {}
-
-  def __del__(self):
-    if self._use_disk_backed_cache:
-      self._cache.close()
-      shutil.rmtree(self._tempdir)
-
-  def __len__(self):
-    return len(self._cache)
-
-  def to_cache_key(self, transform, tag):
-    return str((id(transform), tag))
-
-  def _ensure_pvalue_has_real_producer(self, pvalue):
-    """Ensure the passed-in PValue has the real_producer attribute.
-
-    Args:
-      pvalue: A PValue instance whose cached value is requested.
-
-    During the runner's execution only the results of the primitive transforms
-    are cached. Whenever we are looking for a PValue that is the output of a
-    composite transform we need to find the output of its rightmost transform
-    part.
-    """
-    if not hasattr(pvalue, 'real_producer'):
-      real_producer = pvalue.producer
-      while real_producer.parts:
-        real_producer = real_producer.parts[-1]
-      pvalue.real_producer = real_producer
-
-  def is_cached(self, pobj):
-    from google.cloud.dataflow.pipeline import AppliedPTransform
-    if isinstance(pobj, AppliedPTransform):
-      transform = pobj
-      tag = None
-    else:
-      self._ensure_pvalue_has_real_producer(pobj)
-      transform = pobj.real_producer
-      tag = pobj.tag
-    return self.to_cache_key(transform, tag) in self._cache
-
-  def cache_output(self, transform, tag_or_value, value=None):
-    if value is None:
-      value = tag_or_value
-      tag = None
-    else:
-      tag = tag_or_value
-    self._cache[
-        self.to_cache_key(transform, tag)] = [value, transform.refcounts[tag]]
-
-  def get_pvalue(self, pvalue):
-    """Gets the value associated with a PValue from the cache."""
-    self._ensure_pvalue_has_real_producer(pvalue)
-    try:
-      value_with_refcount = self._cache[self.key(pvalue)]
-      value_with_refcount[1] -= 1
-      logging.debug('PValue computed by %s (tag %s): refcount: %d => %d',
-                    pvalue.real_producer.full_label, self.key(pvalue)[1],
-                    value_with_refcount[1] + 1, value_with_refcount[1])
-      if value_with_refcount[1] <= 0:
-        self.clear_pvalue(pvalue)
-      return value_with_refcount[0]
-    except KeyError:
-      if (pvalue.tag is not None
-          and self.to_cache_key(pvalue.real_producer, None) in self._cache):
-        # This is an undeclared, empty side output of a DoFn executed
-        # in the local runner before this side output referenced.
-        return []
-      else:
-        raise
-
-  def get_unwindowed_pvalue(self, pvalue):
-    return [v.value for v in self.get_pvalue(pvalue)]
-
-  def clear_pvalue(self, pvalue):
-    """Removes a PValue from the cache."""
-    if self.is_cached(pvalue):
-      del self._cache[self.key(pvalue)]
-
-  def key(self, pobj):
-    self._ensure_pvalue_has_real_producer(pobj)
-    return self.to_cache_key(pobj.real_producer, pobj.tag)
-
-
-class PipelineState(object):
-  """State of the Pipeline, as returned by PipelineResult.current_state().
-
-  This is meant to be the union of all the states any runner can put a
-  pipeline in.  Currently, it represents the values of the dataflow
-  API JobState enum.
-  """
-  UNKNOWN = 'UNKNOWN'  # not specified
-  STOPPED = 'STOPPED'  # paused or not yet started
-  RUNNING = 'RUNNING'  # currently running
-  DONE = 'DONE'  # successfully completed (terminal state)
-  FAILED = 'FAILED'  # failed (terminal state)
-  CANCELLED = 'CANCELLED'  # explicitly cancelled (terminal state)
-  UPDATED = 'UPDATED'  # replaced by another job (terminal state)
-  DRAINING = 'DRAINING'  # still processing, no longer reading data
-  DRAINED = 'DRAINED'  # draining completed (terminal state)
-
-
-class PipelineResult(object):
-  """A PipelineResult provides access to info about a pipeline."""
-
-  def __init__(self, state):
-    self._state = state
-
-  def current_state(self):
-    """Return the current state of running the pipeline."""
-    return self._state
-
-  # pylint: disable=unused-argument
-  def aggregated_values(self, aggregator_or_name):
-    """Return a dict of step names to values of the Aggregator."""
-    logging.warn('%s does not implement aggregated_values',
-                 self.__class__.__name__)
-    return {}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/runners/runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/runners/runner_test.py b/sdks/python/google/cloud/dataflow/runners/runner_test.py
deleted file mode 100644
index 67a6bc1..0000000
--- a/sdks/python/google/cloud/dataflow/runners/runner_test.py
+++ /dev/null
@@ -1,66 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the PipelineRunner and DirectPipelineRunner classes.
-
-Note that PipelineRunner and DirectPipelineRunner functionality is tested in all
-the other unit tests. In this file we choose to test only aspects related to
-caching and clearing values that are not tested elsewhere.
-"""
-
-import unittest
-
-from google.cloud.dataflow.internal import apiclient
-from google.cloud.dataflow.pipeline import Pipeline
-from google.cloud.dataflow.runners import create_runner
-from google.cloud.dataflow.runners import DataflowPipelineRunner
-from google.cloud.dataflow.runners import DirectPipelineRunner
-import google.cloud.dataflow.transforms as ptransform
-from google.cloud.dataflow.utils.options import PipelineOptions
-
-
-class RunnerTest(unittest.TestCase):
-
-  def test_create_runner(self):
-    self.assertTrue(
-        isinstance(create_runner('DirectPipelineRunner'), DirectPipelineRunner))
-    self.assertTrue(
-        isinstance(create_runner('DataflowPipelineRunner'),
-                   DataflowPipelineRunner))
-    self.assertTrue(
-        isinstance(create_runner('BlockingDataflowPipelineRunner'),
-                   DataflowPipelineRunner))
-    self.assertRaises(RuntimeError, create_runner, 'xyz')
-
-  def test_remote_runner_translation(self):
-    remote_runner = DataflowPipelineRunner()
-    p = Pipeline(remote_runner,
-                 options=PipelineOptions([
-                     '--dataflow_endpoint=ignored',
-                     '--job_name=test-job',
-                     '--project=test-project',
-                     '--staging_location=ignored',
-                     '--temp_location=/dev/null',
-                     '--no_auth=True'
-                 ]))
-
-    res = (p | ptransform.Create('create', [1, 2, 3])
-           | ptransform.FlatMap('do', lambda x: [(x, x)])
-           | ptransform.GroupByKey('gbk'))
-    remote_runner.job = apiclient.Job(p.options)
-    super(DataflowPipelineRunner, remote_runner).run(p)
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/__init__.py b/sdks/python/google/cloud/dataflow/transforms/__init__.py
deleted file mode 100644
index 79da423..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/__init__.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""PTransform and descendants."""
-
-# pylint: disable=wildcard-import
-from google.cloud.dataflow.transforms import combiners
-from google.cloud.dataflow.transforms.aggregator import *
-from google.cloud.dataflow.transforms.core import *
-from google.cloud.dataflow.transforms.ptransform import *
-from google.cloud.dataflow.transforms.timeutil import *
-from google.cloud.dataflow.transforms.util import *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/aggregator.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/aggregator.py b/sdks/python/google/cloud/dataflow/transforms/aggregator.py
deleted file mode 100644
index 505d115..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/aggregator.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Support for user-defined Aggregators.
-
-Aggregators allow a pipeline to have the workers do custom aggregation
-of statistics about the data processed.  To update an aggregator's value,
-call aggregate_to() on the context passed to a DoFn.
-
-Example:
-import google.cloud.dataflow as df
-
-simple_counter = df.Aggregator('example-counter')
-
-class ExampleDoFn(df.DoFn):
-  def process(self, context):
-    context.aggregate_to(simple_counter, 1)
-    ...
-
-The aggregators defined here show up in the UI as "Custom counters."
-
-You can also query the combined value(s) of an aggregator by calling
-aggregated_value() or aggregated_values() on the result of running a
-pipeline.
-
-"""
-
-from __future__ import absolute_import
-
-from google.cloud.dataflow.transforms import core
-
-
-class Aggregator(object):
-  """A user-specified aggregator of statistics about pipeline data.
-
-  Args:
-    combine_fn: how to combine values input to the aggregation.
-      It must be one of these arithmetic functions:
-
-       - Python's built-in sum, min, max, any, and all.
-       - df.combiners.MeanCombineFn()
-
-      The default is sum of 64-bit ints.
-
-    type: describes the type that will be accepted as input
-      for aggregation; by default types appropriate to the combine_fn
-      are accepted.
-
-  Example uses::
-
-    import google.cloud.dataflow as df
-    simple_counter = df.Aggregator('example-counter')
-    complex_counter = df.Aggregator('other-counter', df.Mean(), float)
-  """
-
-  def __init__(self, name, combine_fn=sum, input_type=int):
-    combine_fn = core.CombineFn.maybe_from_callable(combine_fn).for_input_type(
-        input_type)
-    if not _is_supported_kind(combine_fn):
-      raise ValueError(
-          'combine_fn %r (class %r) '
-          'does not map to a supported aggregation kind'
-          % (combine_fn, combine_fn.__class__))
-    self.name = name
-    self.combine_fn = combine_fn
-    self.input_type = input_type
-
-  def __str__(self):
-    return '<%s>' % self._str_internal()
-
-  def __repr__(self):
-    return '<%s at %s>' % (self._str_internal(), hex(id(self)))
-
-  def _str_internal(self):
-    """Internal helper function for both __str__ and __repr__."""
-    def get_name(thing):
-      try:
-        return thing.__name__
-      except AttributeError:
-        return thing.__class__.__name__
-
-    combine_fn_str = get_name(self.combine_fn)
-    input_arg = '(%s)' % get_name(self.input_type) if self.input_type else ''
-    if combine_fn_str == 'sum' and not input_arg:
-      combine_call = ''
-    else:
-      combine_call = ' %s%s' % (combine_fn_str, input_arg)
-    return 'Aggregator %s%s' % (self.name, combine_call)
-
-
-def _is_supported_kind(combine_fn):
-  # pylint: disable=g-import-not-at-top
-  from google.cloud.dataflow.internal.apiclient import metric_translations
-  return combine_fn.__class__ in metric_translations

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/aggregator_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/aggregator_test.py b/sdks/python/google/cloud/dataflow/transforms/aggregator_test.py
deleted file mode 100644
index db4c320..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/aggregator_test.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for Aggregator class."""
-
-import unittest
-
-import google.cloud.dataflow as df
-from google.cloud.dataflow.transforms import combiners
-from google.cloud.dataflow.transforms.aggregator import Aggregator
-
-
-class AggregatorTest(unittest.TestCase):
-
-  def test_str(self):
-    basic = Aggregator('a-name')
-    self.assertEqual('<Aggregator a-name SumInt64Fn(int)>', str(basic))
-
-    for_max = Aggregator('max-name', max)
-    self.assertEqual('<Aggregator max-name MaxInt64Fn(int)>', str(for_max))
-
-    for_float = Aggregator('f-name', sum, float)
-    self.assertEqual('<Aggregator f-name SumFloatFn(float)>', str(for_float))
-
-    for_mean = Aggregator('m-name', combiners.MeanCombineFn(), float)
-    self.assertEqual('<Aggregator m-name MeanFloatFn(float)>', str(for_mean))
-
-  def test_aggregation(self):
-
-    mean = combiners.MeanCombineFn()
-    mean.__name__ = 'mean'
-    counter_types = [
-        (sum, int, 6),
-        (min, int, 0),
-        (max, int, 3),
-        (mean, int, 1),
-        (sum, float, 6.0),
-        (min, float, 0.0),
-        (max, float, 3.0),
-        (mean, float, 1.5),
-        (any, int, True),
-        (all, float, False),
-    ]
-    aggeregators = [Aggregator('%s_%s' % (f.__name__, t.__name__), f, t)
-                    for f, t, _ in counter_types]
-
-    class UpdateAggregators(df.DoFn):
-      def process(self, context):
-        for a in aggeregators:
-          context.aggregate_to(a, context.element)
-
-    p = df.Pipeline('DirectPipelineRunner')
-    p | df.Create([0, 1, 2, 3]) | df.ParDo(UpdateAggregators())
-    res = p.run()
-    for (_, _, expected), a in zip(counter_types, aggeregators):
-      actual = res.aggregated_values(a).values()[0]
-      self.assertEqual(expected, actual)
-      self.assertEqual(type(expected), type(actual))
-
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/transforms/combiners.py b/sdks/python/google/cloud/dataflow/transforms/combiners.py
deleted file mode 100644
index e7f8242..0000000
--- a/sdks/python/google/cloud/dataflow/transforms/combiners.py
+++ /dev/null
@@ -1,523 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""A library of basic combiner PTransform subclasses."""
-
-from __future__ import absolute_import
-
-import heapq
-import itertools
-import random
-
-from google.cloud.dataflow.transforms import core
-from google.cloud.dataflow.transforms import cy_combiners
-from google.cloud.dataflow.transforms import ptransform
-from google.cloud.dataflow.typehints import Any
-from google.cloud.dataflow.typehints import Dict
-from google.cloud.dataflow.typehints import KV
-from google.cloud.dataflow.typehints import List
-from google.cloud.dataflow.typehints import Tuple
-from google.cloud.dataflow.typehints import TypeVariable
-from google.cloud.dataflow.typehints import Union
-from google.cloud.dataflow.typehints import with_input_types
-from google.cloud.dataflow.typehints import with_output_types
-
-
-__all__ = [
-    'Count',
-    'Mean',
-    'Sample',
-    'Top',
-    'ToDict',
-    'ToList',
-    ]
-
-
-class Mean(object):
-  """Combiners for computing arithmetic means of elements."""
-
-  class Globally(ptransform.PTransform):
-    """combiners.Mean.Globally computes the arithmetic mean of the elements."""
-
-    def apply(self, pcoll):
-      return pcoll | core.CombineGlobally(MeanCombineFn())
-
-  class PerKey(ptransform.PTransform):
-    """combiners.Mean.PerKey finds the means of the values for each key."""
-
-    def apply(self, pcoll):
-      return pcoll | core.CombinePerKey(MeanCombineFn())
-
-
-# TODO(laolu): This type signature is overly restrictive. This should be
-# more general.
-@with_input_types(Union[float, int, long])
-@with_output_types(float)
-class MeanCombineFn(core.CombineFn):
-  """CombineFn for computing an arithmetic mean."""
-
-  def create_accumulator(self):
-    return (0, 0)
-
-  def add_input(self, (sum_, count), element):
-    return sum_ + element, count + 1
-
-  def merge_accumulators(self, accumulators):
-    sums, counts = zip(*accumulators)
-    return sum(sums), sum(counts)
-
-  def extract_output(self, (sum_, count)):
-    if count == 0:
-      return float('NaN')
-    return sum_ / float(count)
-
-  def for_input_type(self, input_type):
-    if input_type is int:
-      return cy_combiners.MeanInt64Fn()
-    elif input_type is float:
-      return cy_combiners.MeanFloatFn()
-    else:
-      return self
-
-
-class Count(object):
-  """Combiners for counting elements."""
-
-  class Globally(ptransform.PTransform):
-    """combiners.Count.Globally counts the total number of elements."""
-
-    def apply(self, pcoll):
-      return pcoll | core.CombineGlobally(CountCombineFn())
-
-  class PerKey(ptransform.PTransform):
-    """combiners.Count.PerKey counts how many elements each unique key has."""
-
-    def apply(self, pcoll):
-      return pcoll | core.CombinePerKey(CountCombineFn())
-
-  class PerElement(ptransform.PTransform):
-    """combiners.Count.PerElement counts how many times each element occurs."""
-
-    def apply(self, pcoll):
-      paired_with_void_type = KV[pcoll.element_type, Any]
-      return (pcoll
-              | (core.Map('%s:PairWithVoid' % self.label, lambda x: (x, None))
-                 .with_output_types(paired_with_void_type))
-              | core.CombinePerKey(CountCombineFn()))
-
-
-@with_input_types(Any)
-@with_output_types(int)
-class CountCombineFn(core.CombineFn):
-  """CombineFn for computing PCollection size."""
-
-  def create_accumulator(self):
-    return 0
-
-  def add_inputs(self, accumulator, elements):
-    return accumulator + len(elements)
-
-  def merge_accumulators(self, accumulators):
-    return sum(accumulators)
-
-  def extract_output(self, accumulator):
-    return accumulator
-
-
-class Top(object):
-  """Combiners for obtaining extremal elements."""
-  # pylint: disable=no-self-argument
-
-  @ptransform.ptransform_fn
-  def Of(label, pcoll, n, compare, *args, **kwargs):
-    """Obtain a list of the compare-most N elements in a PCollection.
-
-    This transform will retrieve the n greatest elements in the PCollection
-    to which it is applied, where "greatest" is determined by the comparator
-    function supplied as the compare argument.
-
-    compare should be an implementation of "a < b" taking at least two arguments
-    (a and b). Additional arguments and side inputs specified in the apply call
-    become additional arguments to the comparator.
-
-    Args:
-      label: display label for transform processes.
-      pcoll: PCollection to process.
-      n: number of elements to extract from pcoll.
-      compare: as described above.
-      *args: as described above.
-      **kwargs: as described above.
-    """
-    return pcoll | core.CombineGlobally(
-        label, TopCombineFn(n, compare), *args, **kwargs)
-
-  @ptransform.ptransform_fn
-  def PerKey(label, pcoll, n, compare, *args, **kwargs):
-    """Identifies the compare-most N elements associated with each key.
-
-    This transform will produce a PCollection mapping unique keys in the input
-    PCollection to the n greatest elements with which they are associated, where
-    "greatest" is determined by the comparator function supplied as the compare
-    argument.
-
-    compare should be an implementation of "a < b" taking at least two arguments
-    (a and b). Additional arguments and side inputs specified in the apply call
-    become additional arguments to the comparator.
-
-    Args:
-      label: display label for transform processes.
-      pcoll: PCollection to process.
-      n: number of elements to extract from pcoll.
-      compare: as described above.
-      *args: as described above.
-      **kwargs: as described above.
-
-    Raises:
-      TypeCheckError: If the output type of the input PCollection is not
-        compatible with KV[A, B].
-    """
-    return pcoll | core.CombinePerKey(
-        label, TopCombineFn(n, compare), *args, **kwargs)
-
-  @ptransform.ptransform_fn
-  def Largest(label, pcoll, n):
-    """Obtain a list of the greatest N elements in a PCollection."""
-    return pcoll | Top.Of(label, n, lambda a, b: a < b)
-
-  @ptransform.ptransform_fn
-  def Smallest(label, pcoll, n):
-    """Obtain a list of the least N elements in a PCollection."""
-    return pcoll | Top.Of(label, n, lambda a, b: b < a)
-
-  @ptransform.ptransform_fn
-  def LargestPerKey(label, pcoll, n):
-    """Identifies the N greatest elements associated with each key."""
-    return pcoll | Top.PerKey(label, n, lambda a, b: a < b)
-
-  @ptransform.ptransform_fn
-  def SmallestPerKey(label, pcoll, n):
-    """Identifies the N least elements associated with each key."""
-    return pcoll | Top.PerKey(label, n, lambda a, b: b < a)
-
-
-T = TypeVariable('T')
-@with_input_types(T)
-@with_output_types(List[T])
-class TopCombineFn(core.CombineFn):
-  """CombineFn doing the combining for all of the Top transforms.
-
-  The comparator function supplied as an argument to the apply call invoking
-  TopCombineFn should be an implementation of "a < b" taking at least two
-  arguments (a and b). Additional arguments and side inputs specified in the
-  apply call become additional arguments to the comparator.
-  """
-
-  # Actually pickling the comparison operators (including, often, their
-  # entire globals) can be very expensive.  Instead refer to them by index
-  # in this dictionary, which is populated on construction (including
-  # unpickling).
-  compare_by_id = {}
-
-  def __init__(self, n, compare, _compare_id=None):  # pylint: disable=invalid-name
-    self._n = n
-    self._compare = compare
-    self._compare_id = _compare_id or id(compare)
-    TopCombineFn.compare_by_id[self._compare_id] = self._compare
-
-  def __reduce_ex__(self, _):
-    return TopCombineFn, (self._n, self._compare, self._compare_id)
-
-  class _HeapItem(object):
-    """A wrapper for values supporting arbitrary comparisons.
-
-    The heap implementation supplied by Python is a min heap that always uses
-    the __lt__ operator if one is available. This wrapper overloads __lt__,
-    letting us specify arbitrary precedence for elements in the PCollection.
-    """
-
-    def __init__(self, item, compare_id, *args, **kwargs):
-      # item:         wrapped item.
-      # compare:      an implementation of the pairwise < operator.
-      # args, kwargs: extra arguments supplied to the compare function.
-      self.item = item
-      self.compare_id = compare_id
-      self.args = args
-      self.kwargs = kwargs
-
-    def __lt__(self, other):
-      return TopCombineFn.compare_by_id[self.compare_id](
-          self.item, other.item, *self.args, **self.kwargs)
-
-  def create_accumulator(self, *args, **kwargs):
-    return []  # Empty heap.
-
-  def add_input(self, heap, element, *args, **kwargs):
-    # Note that because heap is a min heap, heappushpop will discard incoming
-    # elements that are lesser (according to compare) than those in the heap
-    # (since that's what you would get if you pushed a small element on and
-    # popped the smallest element off). So, filtering a collection with a
-    # min-heap gives you the largest elements in the collection.
-    item = self._HeapItem(element, self._compare_id, *args, **kwargs)
-    if len(heap) < self._n:
-      heapq.heappush(heap, item)
-    else:
-      heapq.heappushpop(heap, item)
-    return heap
-
-  def merge_accumulators(self, heaps, *args, **kwargs):
-    heap = []
-    for e in itertools.chain(*heaps):
-      if len(heap) < self._n:
-        heapq.heappush(heap, e)
-      else:
-        heapq.heappushpop(heap, e)
-    return heap
-
-  def extract_output(self, heap, *args, **kwargs):
-    # Items in the heap are heap-ordered. We put them in sorted order, but we
-    # have to use the reverse order because the result is expected to go
-    # from greatest to least (as defined by the supplied comparison function).
-    return [e.item for e in sorted(heap, reverse=True)]
-
-
-# Python's pickling is broken for nested classes.
-_HeapItem = TopCombineFn._HeapItem  # pylint: disable=protected-access
-
-
-class Largest(TopCombineFn):
-
-  def __init__(self, n):
-    super(Largest, self).__init__(n, lambda a, b: a < b)
-
-  def default_label(self):
-    return 'Largest(%s)' % self._n
-
-
-class Smallest(TopCombineFn):
-
-  def __init__(self, n):
-    super(Smallest, self).__init__(n, lambda a, b: b < a)
-
-  def default_label(self):
-    return 'Smallest(%s)' % self._n
-
-
-class Sample(object):
-  """Combiners for sampling n elements without replacement."""
-  # pylint: disable=no-self-argument
-
-  @ptransform.ptransform_fn
-  def FixedSizeGlobally(label, pcoll, n):
-    return pcoll | core.CombineGlobally(label, SampleCombineFn(n))
-
-  @ptransform.ptransform_fn
-  def FixedSizePerKey(label, pcoll, n):
-    return pcoll | core.CombinePerKey(label, SampleCombineFn(n))
-
-
-T = TypeVariable('T')
-@with_input_types(T)
-@with_output_types(List[T])
-class SampleCombineFn(core.CombineFn):
-  """CombineFn for all Sample transforms."""
-
-  def __init__(self, n):
-    super(SampleCombineFn, self).__init__()
-    # Most of this combiner's work is done by a TopCombineFn. We could just
-    # subclass TopCombineFn to make this class, but since sampling is not
-    # really a kind of Top operation, we use a TopCombineFn instance as a
-    # helper instead.
-    self._top_combiner = TopCombineFn(n, lambda a, b: a < b)
-
-  def create_accumulator(self):
-    return self._top_combiner.create_accumulator()
-
-  def add_input(self, heap, element):
-    # Before passing elements to the Top combiner, we pair them with random
-    # numbers. The elements with the n largest random number "keys" will be
-    # selected for the output.
-    return self._top_combiner.add_input(heap, (random.random(), element))
-
-  def merge_accumulators(self, heaps):
-    return self._top_combiner.merge_accumulators(heaps)
-
-  def extract_output(self, heap):
-    # Here we strip off the random number keys we added in add_input.
-    return [e for _, e in self._top_combiner.extract_output(heap)]
-
-
-class _TupleCombineFnBase(core.CombineFn):
-
-  def __init__(self, *combiners):
-    self._combiners = [core.CombineFn.maybe_from_callable(c) for c in combiners]
-
-  def create_accumulator(self):
-    return [c.create_accumulator() for c in self._combiners]
-
-  def merge_accumulators(self, accumulators):
-    return [c.merge_accumulators(a)
-            for c, a in zip(self._combiners, zip(*accumulators))]
-
-  def extract_output(self, accumulator):
-    return tuple([c.extract_output(a)
-                  for c, a in zip(self._combiners, accumulator)])
-
-
-class TupleCombineFn(_TupleCombineFnBase):
-
-  def add_inputs(self, accumulator, elements):
-    return [c.add_inputs(a, e)
-            for c, a, e in zip(self._combiners, accumulator, zip(*elements))]
-
-  def with_common_input(self):
-    return SingleInputTupleCombineFn(*self._combiners)
-
-
-class SingleInputTupleCombineFn(_TupleCombineFnBase):
-
-  def add_inputs(self, accumulator, elements):
-    return [c.add_inputs(a, elements)
-            for c, a in zip(self._combiners, accumulator)]
-
-
-class ToList(ptransform.PTransform):
-  """A global CombineFn that condenses a PCollection into a single list."""
-
-  def __init__(self, label='ToList'):
-    super(ToList, self).__init__(label)
-
-  def apply(self, pcoll):
-    return pcoll | core.CombineGlobally(self.label, ToListCombineFn())
-
-
-T = TypeVariable('T')
-@with_input_types(T)
-@with_output_types(List[T])
-class ToListCombineFn(core.CombineFn):
-  """CombineFn for to_list."""
-
-  def create_accumulator(self):
-    return []
-
-  def add_input(self, accumulator, element):
-    accumulator.append(element)
-    return accumulator
-
-  def merge_accumulators(self, accumulators):
-    return sum(accumulators, [])
-
-  def extract_output(self, accumulator):
-    return accumulator
-
-
-class ToDict(ptransform.PTransform):
-  """A global CombineFn that condenses a PCollection into a single dict.
-
-  PCollections should consist of 2-tuples, notionally (key, value) pairs.
-  If multiple values are associated with the same key, only one of the values
-  will be present in the resulting dict.
-  """
-
-  def __init__(self, label='ToDict'):
-    super(ToDict, self).__init__(label)
-
-  def apply(self, pcoll):
-    return pcoll | core.CombineGlobally(self.label, ToDictCombineFn())
-
-
-K = TypeVariable('K')
-V = TypeVariable('V')
-@with_input_types(Tuple[K, V])
-@with_output_types(Dict[K, V])
-class ToDictCombineFn(core.CombineFn):
-  """CombineFn for to_dict."""
-
-  def create_accumulator(self):
-    return dict()
-
-  def add_input(self, accumulator, element):
-    key, value = element
-    accumulator[key] = value
-    return accumulator
-
-  def merge_accumulators(self, accumulators):
-    result = dict()
-    for a in accumulators:
-      result.update(a)
-    return result
-
-  def extract_output(self, accumulator):
-    return accumulator
-
-
-def curry_combine_fn(fn, args, kwargs):
-  if not args and not kwargs:
-    return fn
-
-  else:
-
-    class CurriedFn(core.CombineFn):
-      """CombineFn that applies extra arguments."""
-
-      def create_accumulator(self):
-        return fn.create_accumulator(*args, **kwargs)
-
-      def add_input(self, accumulator, element):
-        return fn.add_input(accumulator, element, *args, **kwargs)
-
-      def add_inputs(self, accumulator, elements):
-        return fn.add_inputs(accumulator, elements, *args, **kwargs)
-
-      def merge_accumulators(self, accumulators):
-        return fn.merge_accumulators(accumulators, *args, **kwargs)
-
-      def extract_output(self, accumulator):
-        return fn.extract_output(accumulator, *args, **kwargs)
-
-      def apply(self, elements):
-        return fn.apply(elements, *args, **kwargs)
-
-    return CurriedFn()
-
-
-class PhasedCombineFnExecutor(object):
-  """Executor for phases of combine operations."""
-
-  def __init__(self, phase, fn, args, kwargs):
-
-    self.combine_fn = curry_combine_fn(fn, args, kwargs)
-
-    if phase == 'all':
-      self.apply = self.full_combine
-    elif phase == 'add':
-      self.apply = self.add_only
-    elif phase == 'merge':
-      self.apply = self.merge_only
-    elif phase == 'extract':
-      self.apply = self.extract_only
-    else:
-      raise ValueError('Unexpected phase: %s' % phase)
-
-  def full_combine(self, elements):  # pylint: disable=invalid-name
-    return self.combine_fn.apply(elements)
-
-  def add_only(self, elements):  # pylint: disable=invalid-name
-    return self.combine_fn.add_inputs(
-        self.combine_fn.create_accumulator(), elements)
-
-  def merge_only(self, accumulators):  # pylint: disable=invalid-name
-    return self.combine_fn.merge_accumulators(accumulators)
-
-  def extract_only(self, accumulator):  # pylint: disable=invalid-name
-    return self.combine_fn.extract_output(accumulator)


Mime
View raw message