beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [07/22] beam git commit: Rename google_cloud_dataflow and google_cloud_platform
Date Thu, 23 Feb 2017 01:23:02 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/__init__.py b/sdks/python/apache_beam/runners/__init__.py
index a77c928..2b93c30 100644
--- a/sdks/python/apache_beam/runners/__init__.py
+++ b/sdks/python/apache_beam/runners/__init__.py
@@ -26,5 +26,5 @@ from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.runners.runner import create_runner
 
-from apache_beam.runners.google_cloud_dataflow.dataflow_runner import DataflowRunner
-from apache_beam.runners.google_cloud_dataflow.test_dataflow_runner import TestDataflowRunner
+from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
+from apache_beam.runners.dataflow.test_dataflow_runner import TestDataflowRunner

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/dataflow/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/__init__.py b/sdks/python/apache_beam/runners/dataflow/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
new file mode 100644
index 0000000..1d86f2f
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+DataflowRunner implementation of MetricResults. It is in charge of
+responding to queries of current metrics by going to the dataflow
+service.
+"""
+
+from apache_beam.metrics.metric import MetricResults
+
+
+# TODO(pabloem)(JIRA-1381) Implement this once metrics are queriable from
+# dataflow service
+class DataflowMetrics(MetricResults):
+
+  def query(self, filter=None):
+    return {'counters': [],
+            'distributions': []}

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
new file mode 100644
index 0000000..5475ac7
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Tests corresponding to the DataflowRunner implementation of MetricsResult,
+the DataflowMetrics class.
+"""

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
new file mode 100644
index 0000000..5a8f547
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -0,0 +1,724 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A runner implementation that submits a job for remote execution.
+
+The runner will create a JSON description of the job graph and then submit it
+to the Dataflow Service for remote execution by a worker.
+"""
+
+import base64
+import logging
+import threading
+import time
+import traceback
+
+from apache_beam import coders
+from apache_beam import pvalue
+from apache_beam.internal import pickler
+from apache_beam.internal.gcp import json_value
+from apache_beam.pvalue import PCollectionView
+from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
+from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
+from apache_beam.runners.runner import PValueCache
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.runners.runner import PipelineRunner
+from apache_beam.runners.runner import PipelineState
+from apache_beam.transforms.display import DisplayData
+from apache_beam.typehints import typehints
+from apache_beam.utils import names
+from apache_beam.utils.names import PropertyNames
+from apache_beam.utils.names import TransformNames
+from apache_beam.utils.pipeline_options import StandardOptions
+
+
+class DataflowRunner(PipelineRunner):
+  """A runner that creates job graphs and submits them for remote execution.
+
+  Every execution of the run() method will submit an independent job for
+  remote execution that consists of the nodes reachable from the passed in
+  node argument or entire graph if node is None. The run() method returns
+  after the service created the job and  will not wait for the job to finish
+  if blocking is set to False.
+  """
+
+  # Environment version information. It is passed to the service during a
+  # a job submission and is used by the service to establish what features
+  # are expected by the workers.
+  BATCH_ENVIRONMENT_MAJOR_VERSION = '5'
+  STREAMING_ENVIRONMENT_MAJOR_VERSION = '0'
+
+  def __init__(self, cache=None):
+    # Cache of CloudWorkflowStep protos generated while the runner
+    # "executes" a pipeline.
+    self._cache = cache if cache is not None else PValueCache()
+    self._unique_step_id = 0
+
+  def _get_unique_step_name(self):
+    self._unique_step_id += 1
+    return 's%s' % self._unique_step_id
+
+  @staticmethod
+  def poll_for_job_completion(runner, result):
+    """Polls for the specified job to finish running (successfully or not)."""
+    last_message_time = None
+    last_message_id = None
+
+    last_error_rank = float('-inf')
+    last_error_msg = None
+    last_job_state = None
+    # How long to wait after pipeline failure for the error
+    # message to show up giving the reason for the failure.
+    # It typically takes about 30 seconds.
+    final_countdown_timer_secs = 50.0
+    sleep_secs = 5.0
+
+    # Try to prioritize the user-level traceback, if any.
+    def rank_error(msg):
+      if 'work item was attempted' in msg:
+        return -1
+      elif 'Traceback' in msg:
+        return 1
+      else:
+        return 0
+
+    job_id = result.job_id()
+    while True:
+      response = runner.dataflow_client.get_job(job_id)
+      # If get() is called very soon after Create() the response may not contain
+      # an initialized 'currentState' field.
+      if response.currentState is not None:
+        if response.currentState != last_job_state:
+          logging.info('Job %s is in state %s', job_id, response.currentState)
+          last_job_state = response.currentState
+        if str(response.currentState) != 'JOB_STATE_RUNNING':
+          # Stop checking for new messages on timeout, explanatory
+          # message received, success, or a terminal job state caused
+          # by the user that therefore doesn't require explanation.
+          if (final_countdown_timer_secs <= 0.0
+              or last_error_msg is not None
+              or str(response.currentState) == 'JOB_STATE_DONE'
+              or str(response.currentState) == 'JOB_STATE_CANCELLED'
+              or str(response.currentState) == 'JOB_STATE_UPDATED'
+              or str(response.currentState) == 'JOB_STATE_DRAINED'):
+            break
+          # The job has failed; ensure we see any final error messages.
+          sleep_secs = 1.0      # poll faster during the final countdown
+          final_countdown_timer_secs -= sleep_secs
+      time.sleep(sleep_secs)
+
+      # Get all messages since beginning of the job run or since last message.
+      page_token = None
+      while True:
+        messages, page_token = runner.dataflow_client.list_messages(
+            job_id, page_token=page_token, start_time=last_message_time)
+        for m in messages:
+          if last_message_id is not None and m.id == last_message_id:
+            # Skip the first message if it is the last message we got in the
+            # previous round. This can happen because we use the
+            # last_message_time as a parameter of the query for new messages.
+            continue
+          last_message_time = m.time
+          last_message_id = m.id
+          # Skip empty messages.
+          if m.messageImportance is None:
+            continue
+          logging.info(
+              '%s: %s: %s: %s', m.id, m.time, m.messageImportance,
+              m.messageText)
+          if str(m.messageImportance) == 'JOB_MESSAGE_ERROR':
+            if rank_error(m.messageText) >= last_error_rank:
+              last_error_rank = rank_error(m.messageText)
+              last_error_msg = m.messageText
+        if not page_token:
+          break
+
+    result._job = response
+    runner.last_error_msg = last_error_msg
+
+  def run(self, pipeline):
+    """Remotely executes entire pipeline or parts reachable from node."""
+    # Import here to avoid adding the dependency for local running scenarios.
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.runners.dataflow.internal import apiclient
+    self.job = apiclient.Job(pipeline.options)
+
+    # The superclass's run will trigger a traversal of all reachable nodes.
+    super(DataflowRunner, self).run(pipeline)
+
+    standard_options = pipeline.options.view_as(StandardOptions)
+    if standard_options.streaming:
+      job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
+    else:
+      job_version = DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
+
+    # Get a Dataflow API client and set its options
+    self.dataflow_client = apiclient.DataflowApplicationClient(
+        pipeline.options, job_version)
+
+    # Create the job
+    return DataflowPipelineResult(
+        self.dataflow_client.create_job(self.job), self)
+
+  def _get_typehint_based_encoding(self, typehint, window_coder):
+    """Returns an encoding based on a typehint object."""
+    return self._get_cloud_encoding(self._get_coder(typehint,
+                                                    window_coder=window_coder))
+
+  @staticmethod
+  def _get_coder(typehint, window_coder):
+    """Returns a coder based on a typehint object."""
+    if window_coder:
+      return coders.WindowedValueCoder(
+          coders.registry.get_coder(typehint),
+          window_coder=window_coder)
+    else:
+      return coders.registry.get_coder(typehint)
+
+  def _get_cloud_encoding(self, coder):
+    """Returns an encoding based on a coder object."""
+    if not isinstance(coder, coders.Coder):
+      raise TypeError('Coder object must inherit from coders.Coder: %s.' %
+                      str(coder))
+    return coder.as_cloud_object()
+
+  def _get_side_input_encoding(self, input_encoding):
+    """Returns an encoding for the output of a view transform.
+
+    Args:
+      input_encoding: encoding of current transform's input. Side inputs need
+        this because the service will check that input and output types match.
+
+    Returns:
+      An encoding that matches the output and input encoding. This is essential
+      for the View transforms introduced to produce side inputs to a ParDo.
+    """
+    return {
+        '@type': input_encoding['@type'],
+        'component_encodings': [input_encoding]
+    }
+
+  def _get_encoded_output_coder(self, transform_node, window_value=True):
+    """Returns the cloud encoding of the coder for the output of a transform."""
+    if (len(transform_node.outputs) == 1
+        and transform_node.outputs[0].element_type is not None):
+      # TODO(robertwb): Handle type hints for multi-output transforms.
+      element_type = transform_node.outputs[0].element_type
+    else:
+      # TODO(silviuc): Remove this branch (and assert) when typehints are
+      # propagated everywhere. Returning an 'Any' as type hint will trigger
+      # usage of the fallback coder (i.e., cPickler).
+      element_type = typehints.Any
+    if window_value:
+      window_coder = (
+          transform_node.outputs[0].windowing.windowfn.get_window_coder())
+    else:
+      window_coder = None
+    return self._get_typehint_based_encoding(
+        element_type, window_coder=window_coder)
+
+  def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
+    """Creates a Step object and adds it to the cache."""
+    # Import here to avoid adding the dependency for local running scenarios.
+    # pylint: disable=wrong-import-order, wrong-import-position
+    from apache_beam.runners.dataflow.internal import apiclient
+    step = apiclient.Step(step_kind, self._get_unique_step_name())
+    self.job.proto.steps.append(step.proto)
+    step.add_property(PropertyNames.USER_NAME, step_label)
+    # Cache the node/step association for the main output of the transform node.
+    self._cache.cache_output(transform_node, None, step)
+    # If side_tags is not () then this is a multi-output transform node and we
+    # need to cache the (node, tag, step) for each of the tags used to access
+    # the outputs. This is essential because the keys used to search in the
+    # cache always contain the tag.
+    for tag in side_tags:
+      self._cache.cache_output(transform_node, tag, step)
+
+    # Finally, we add the display data items to the pipeline step.
+    # If the transform contains no display data then an empty list is added.
+    step.add_property(
+        PropertyNames.DISPLAY_DATA,
+        [item.get_dict() for item in
+         DisplayData.create_from(transform_node.transform).items])
+
+    return step
+
+  def run_Create(self, transform_node):
+    transform = transform_node.transform
+    step = self._add_step(TransformNames.CREATE_PCOLLECTION,
+                          transform_node.full_label, transform_node)
+    # TODO(silviuc): Eventually use a coder based on typecoders.
+    # Note that we base64-encode values here so that the service will accept
+    # the values.
+    element_coder = coders.PickleCoder()
+    step.add_property(
+        PropertyNames.ELEMENT,
+        [base64.b64encode(element_coder.encode(v))
+         for v in transform.value])
+    # The service expects a WindowedValueCoder here, so we wrap the actual
+    # encoding in a WindowedValueCoder.
+    step.encoding = self._get_cloud_encoding(
+        coders.WindowedValueCoder(element_coder))
+    step.add_property(
+        PropertyNames.OUTPUT_INFO,
+        [{PropertyNames.USER_NAME: (
+            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
+          PropertyNames.ENCODING: step.encoding,
+          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+
+  def run_CreatePCollectionView(self, transform_node):
+    step = self._add_step(TransformNames.COLLECTION_TO_SINGLETON,
+                          transform_node.full_label, transform_node)
+    input_tag = transform_node.inputs[0].tag
+    input_step = self._cache.get_pvalue(transform_node.inputs[0])
+    step.add_property(
+        PropertyNames.PARALLEL_INPUT,
+        {'@type': 'OutputReference',
+         PropertyNames.STEP_NAME: input_step.proto.name,
+         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
+    step.encoding = self._get_side_input_encoding(input_step.encoding)
+    step.add_property(
+        PropertyNames.OUTPUT_INFO,
+        [{PropertyNames.USER_NAME: (
+            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
+          PropertyNames.ENCODING: step.encoding,
+          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+
+  def run_Flatten(self, transform_node):
+    step = self._add_step(TransformNames.FLATTEN,
+                          transform_node.full_label, transform_node)
+    inputs = []
+    for one_input in transform_node.inputs:
+      input_step = self._cache.get_pvalue(one_input)
+      inputs.append(
+          {'@type': 'OutputReference',
+           PropertyNames.STEP_NAME: input_step.proto.name,
+           PropertyNames.OUTPUT_NAME: input_step.get_output(one_input.tag)})
+    step.add_property(PropertyNames.INPUTS, inputs)
+    step.encoding = self._get_encoded_output_coder(transform_node)
+    step.add_property(
+        PropertyNames.OUTPUT_INFO,
+        [{PropertyNames.USER_NAME: (
+            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
+          PropertyNames.ENCODING: step.encoding,
+          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+
+  def apply_GroupByKey(self, transform, pcoll):
+    # Infer coder of parent.
+    #
+    # TODO(ccy): make Coder inference and checking less specialized and more
+    # comprehensive.
+    parent = pcoll.producer
+    if parent:
+      coder = parent.transform._infer_output_coder()  # pylint: disable=protected-access
+    if not coder:
+      coder = self._get_coder(pcoll.element_type or typehints.Any, None)
+    if not coder.is_kv_coder():
+      raise ValueError(('Coder for the GroupByKey operation "%s" is not a '
+                        'key-value coder: %s.') % (transform.label,
+                                                   coder))
+    # TODO(robertwb): Update the coder itself if it changed.
+    coders.registry.verify_deterministic(
+        coder.key_coder(), 'GroupByKey operation "%s"' % transform.label)
+
+    return pvalue.PCollection(pcoll.pipeline)
+
+  def run_GroupByKey(self, transform_node):
+    input_tag = transform_node.inputs[0].tag
+    input_step = self._cache.get_pvalue(transform_node.inputs[0])
+    step = self._add_step(
+        TransformNames.GROUP, transform_node.full_label, transform_node)
+    step.add_property(
+        PropertyNames.PARALLEL_INPUT,
+        {'@type': 'OutputReference',
+         PropertyNames.STEP_NAME: input_step.proto.name,
+         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
+    step.encoding = self._get_encoded_output_coder(transform_node)
+    step.add_property(
+        PropertyNames.OUTPUT_INFO,
+        [{PropertyNames.USER_NAME: (
+            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
+          PropertyNames.ENCODING: step.encoding,
+          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+    windowing = transform_node.transform.get_windowing(
+        transform_node.inputs)
+    step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(windowing))
+
+  def run_ParDo(self, transform_node):
+    transform = transform_node.transform
+    input_tag = transform_node.inputs[0].tag
+    input_step = self._cache.get_pvalue(transform_node.inputs[0])
+
+    # Attach side inputs.
+    si_dict = {}
+    # We must call self._cache.get_pvalue exactly once due to refcounting.
+    si_labels = {}
+    for side_pval in transform_node.side_inputs:
+      si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name
+    lookup_label = lambda side_pval: si_labels[side_pval]
+    for side_pval in transform_node.side_inputs:
+      assert isinstance(side_pval, PCollectionView)
+      si_label = lookup_label(side_pval)
+      si_dict[si_label] = {
+          '@type': 'OutputReference',
+          PropertyNames.STEP_NAME: si_label,
+          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}
+
+    # Now create the step for the ParDo transform being handled.
+    step = self._add_step(
+        TransformNames.DO, transform_node.full_label, transform_node,
+        transform_node.transform.side_output_tags)
+    fn_data = self._pardo_fn_data(transform_node, lookup_label)
+    step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data))
+    step.add_property(
+        PropertyNames.PARALLEL_INPUT,
+        {'@type': 'OutputReference',
+         PropertyNames.STEP_NAME: input_step.proto.name,
+         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
+    # Add side inputs if any.
+    step.add_property(PropertyNames.NON_PARALLEL_INPUTS, si_dict)
+
+    # Generate description for main output and side outputs. The output names
+    # will be 'out' for main output and 'out_<tag>' for a tagged output.
+    # Using 'out' as a tag will not clash with the name for main since it will
+    # be transformed into 'out_out' internally.
+    outputs = []
+    step.encoding = self._get_encoded_output_coder(transform_node)
+
+    # Add the main output to the description.
+    outputs.append(
+        {PropertyNames.USER_NAME: (
+            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
+         PropertyNames.ENCODING: step.encoding,
+         PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
+    for side_tag in transform.side_output_tags:
+      # The assumption here is that side outputs will have the same typehint
+      # and coder as the main output. This is certainly the case right now
+      # but conceivably it could change in the future.
+      outputs.append(
+          {PropertyNames.USER_NAME: (
+              '%s.%s' % (transform_node.full_label, side_tag)),
+           PropertyNames.ENCODING: step.encoding,
+           PropertyNames.OUTPUT_NAME: (
+               '%s_%s' % (PropertyNames.OUT, side_tag))})
+
+    step.add_property(PropertyNames.OUTPUT_INFO, outputs)
+
+  @staticmethod
+  def _pardo_fn_data(transform_node, get_label):
+    transform = transform_node.transform
+    si_tags_and_types = [  # pylint: disable=protected-access
+        (get_label(side_pval), side_pval.__class__, side_pval._view_options())
+        for side_pval in transform_node.side_inputs]
+    return (transform.fn, transform.args, transform.kwargs, si_tags_and_types,
+            transform_node.inputs[0].windowing)
+
+  def apply_CombineValues(self, transform, pcoll):
+    return pvalue.PCollection(pcoll.pipeline)
+
+  def run_CombineValues(self, transform_node):
+    transform = transform_node.transform
+    input_tag = transform_node.inputs[0].tag
+    input_step = self._cache.get_pvalue(transform_node.inputs[0])
+    step = self._add_step(
+        TransformNames.COMBINE, transform_node.full_label, transform_node)
+    # Combiner functions do not take deferred side-inputs (i.e. PValues) and
+    # therefore the code to handle extra args/kwargs is simpler than for the
+    # DoFn's of the ParDo transform. In the last, empty argument is where
+    # side inputs information would go.
+    fn_data = (transform.fn, transform.args, transform.kwargs, ())
+    step.add_property(PropertyNames.SERIALIZED_FN,
+                      pickler.dumps(fn_data))
+    step.add_property(
+        PropertyNames.PARALLEL_INPUT,
+        {'@type': 'OutputReference',
+         PropertyNames.STEP_NAME: input_step.proto.name,
+         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
+    # Note that the accumulator must not have a WindowedValue encoding, while
+    # the output of this step does in fact have a WindowedValue encoding.
+    accumulator_encoding = self._get_encoded_output_coder(transform_node,
+                                                          window_value=False)
+    output_encoding = self._get_encoded_output_coder(transform_node)
+
+    step.encoding = output_encoding
+    step.add_property(PropertyNames.ENCODING, accumulator_encoding)
+    # Generate description for main output 'out.'
+    outputs = []
+    # Add the main output to the description.
+    outputs.append(
+        {PropertyNames.USER_NAME: (
+            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
+         PropertyNames.ENCODING: step.encoding,
+         PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
+    step.add_property(PropertyNames.OUTPUT_INFO, outputs)
+
+  def run_Read(self, transform_node):
+    transform = transform_node.transform
+    step = self._add_step(
+        TransformNames.READ, transform_node.full_label, transform_node)
+    # TODO(mairbek): refactor if-else tree to use registerable functions.
+    # Initialize the source specific properties.
+
+    if not hasattr(transform.source, 'format'):
+      # If a format is not set, we assume the source to be a custom source.
+      source_dict = {}
+
+      source_dict['spec'] = {
+          '@type': names.SOURCE_TYPE,
+          names.SERIALIZED_SOURCE_KEY: pickler.dumps(transform.source)
+      }
+
+      try:
+        source_dict['metadata'] = {
+            'estimated_size_bytes': json_value.get_typed_value_descriptor(
+                transform.source.estimate_size())
+        }
+      except Exception:  # pylint: disable=broad-except
+        # Size estimation is best effort. So we log the error and continue.
+        logging.info(
+            'Could not estimate size of source %r due to an exception: %s',
+            transform.source, traceback.format_exc())
+
+      step.add_property(PropertyNames.SOURCE_STEP_INPUT,
+                        source_dict)
+    elif transform.source.format == 'text':
+      step.add_property(PropertyNames.FILE_PATTERN, transform.source.path)
+    elif transform.source.format == 'bigquery':
+      step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO')
+      # TODO(silviuc): Add table validation if transform.source.validate.
+      if transform.source.table_reference is not None:
+        step.add_property(PropertyNames.BIGQUERY_DATASET,
+                          transform.source.table_reference.datasetId)
+        step.add_property(PropertyNames.BIGQUERY_TABLE,
+                          transform.source.table_reference.tableId)
+        # If project owning the table was not specified then the project owning
+        # the workflow (current project) will be used.
+        if transform.source.table_reference.projectId is not None:
+          step.add_property(PropertyNames.BIGQUERY_PROJECT,
+                            transform.source.table_reference.projectId)
+      elif transform.source.query is not None:
+        step.add_property(PropertyNames.BIGQUERY_QUERY, transform.source.query)
+        step.add_property(PropertyNames.BIGQUERY_USE_LEGACY_SQL,
+                          transform.source.use_legacy_sql)
+        step.add_property(PropertyNames.BIGQUERY_FLATTEN_RESULTS,
+                          transform.source.flatten_results)
+      else:
+        raise ValueError('BigQuery source %r must specify either a table or'
+                         ' a query',
+                         transform.source)
+    elif transform.source.format == 'pubsub':
+      standard_options = (
+          transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
+      if not standard_options.streaming:
+        raise ValueError('PubSubSource is currently available for use only in '
+                         'streaming pipelines.')
+      step.add_property(PropertyNames.PUBSUB_TOPIC, transform.source.topic)
+      if transform.source.subscription:
+        step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION,
+                          transform.source.topic)
+      if transform.source.id_label:
+        step.add_property(PropertyNames.PUBSUB_ID_LABEL,
+                          transform.source.id_label)
+    else:
+      raise ValueError(
+          'Source %r has unexpected format %s.' % (
+              transform.source, transform.source.format))
+
+    if not hasattr(transform.source, 'format'):
+      step.add_property(PropertyNames.FORMAT, names.SOURCE_FORMAT)
+    else:
+      step.add_property(PropertyNames.FORMAT, transform.source.format)
+
+    # Wrap coder in WindowedValueCoder: this is necessary as the encoding of a
+    # step should be the type of value outputted by each step.  Read steps
+    # automatically wrap output values in a WindowedValue wrapper, if necessary.
+    # This is also necessary for proper encoding for size estimation.
+    coder = coders.WindowedValueCoder(transform._infer_output_coder())  # pylint: disable=protected-access
+
+    step.encoding = self._get_cloud_encoding(coder)
+    step.add_property(
+        PropertyNames.OUTPUT_INFO,
+        [{PropertyNames.USER_NAME: (
+            '%s.%s' % (transform_node.full_label, PropertyNames.OUT)),
+          PropertyNames.ENCODING: step.encoding,
+          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
+
+  def run__NativeWrite(self, transform_node):
+    transform = transform_node.transform
+    input_tag = transform_node.inputs[0].tag
+    input_step = self._cache.get_pvalue(transform_node.inputs[0])
+    step = self._add_step(
+        TransformNames.WRITE, transform_node.full_label, transform_node)
+    # TODO(mairbek): refactor if-else tree to use registerable functions.
+    # Initialize the sink specific properties.
+    if transform.sink.format == 'text':
+      # Note that it is important to use typed properties (@type/value dicts)
+      # for non-string properties and also for empty strings. For example,
+      # in the code below the num_shards must have type and also
+      # file_name_suffix and shard_name_template (could be empty strings).
+      step.add_property(
+          PropertyNames.FILE_NAME_PREFIX, transform.sink.file_name_prefix,
+          with_type=True)
+      step.add_property(
+          PropertyNames.FILE_NAME_SUFFIX, transform.sink.file_name_suffix,
+          with_type=True)
+      step.add_property(
+          PropertyNames.SHARD_NAME_TEMPLATE, transform.sink.shard_name_template,
+          with_type=True)
+      if transform.sink.num_shards > 0:
+        step.add_property(
+            PropertyNames.NUM_SHARDS, transform.sink.num_shards, with_type=True)
+      # TODO(silviuc): Implement sink validation.
+      step.add_property(PropertyNames.VALIDATE_SINK, False, with_type=True)
+    elif transform.sink.format == 'bigquery':
+      # TODO(silviuc): Add table validation if transform.sink.validate.
+      step.add_property(PropertyNames.BIGQUERY_DATASET,
+                        transform.sink.table_reference.datasetId)
+      step.add_property(PropertyNames.BIGQUERY_TABLE,
+                        transform.sink.table_reference.tableId)
+      # If project owning the table was not specified then the project owning
+      # the workflow (current project) will be used.
+      if transform.sink.table_reference.projectId is not None:
+        step.add_property(PropertyNames.BIGQUERY_PROJECT,
+                          transform.sink.table_reference.projectId)
+      step.add_property(PropertyNames.BIGQUERY_CREATE_DISPOSITION,
+                        transform.sink.create_disposition)
+      step.add_property(PropertyNames.BIGQUERY_WRITE_DISPOSITION,
+                        transform.sink.write_disposition)
+      if transform.sink.table_schema is not None:
+        step.add_property(
+            PropertyNames.BIGQUERY_SCHEMA, transform.sink.schema_as_json())
+    elif transform.sink.format == 'pubsub':
+      standard_options = (
+          transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
+      if not standard_options.streaming:
+        raise ValueError('PubSubSink is currently available for use only in '
+                         'streaming pipelines.')
+      step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.topic)
+    else:
+      raise ValueError(
+          'Sink %r has unexpected format %s.' % (
+              transform.sink, transform.sink.format))
+    step.add_property(PropertyNames.FORMAT, transform.sink.format)
+
+    # Wrap coder in WindowedValueCoder: this is necessary for proper encoding
+    # for size estimation.
+    coder = coders.WindowedValueCoder(transform.sink.coder)
+    step.encoding = self._get_cloud_encoding(coder)
+    step.add_property(PropertyNames.ENCODING, step.encoding)
+    step.add_property(
+        PropertyNames.PARALLEL_INPUT,
+        {'@type': 'OutputReference',
+         PropertyNames.STEP_NAME: input_step.proto.name,
+         PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
+
+
+class DataflowPipelineResult(PipelineResult):
+  """Represents the state of a pipeline run on the Dataflow service."""
+
+  def __init__(self, job, runner):
+    """Job is a Job message from the Dataflow API."""
+    self._job = job
+    self._runner = runner
+
+  def job_id(self):
+    return self._job.id
+
+  def metrics(self):
+    return DataflowMetrics()
+
+  @property
+  def has_job(self):
+    return self._job is not None
+
+  @property
+  def state(self):
+    """Return the current state of the remote job.
+
+    Returns:
+      A PipelineState object.
+    """
+    if not self.has_job:
+      return PipelineState.UNKNOWN
+
+    values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+    api_jobstate_map = {
+        values_enum.JOB_STATE_UNKNOWN: PipelineState.UNKNOWN,
+        values_enum.JOB_STATE_STOPPED: PipelineState.STOPPED,
+        values_enum.JOB_STATE_RUNNING: PipelineState.RUNNING,
+        values_enum.JOB_STATE_DONE: PipelineState.DONE,
+        values_enum.JOB_STATE_FAILED: PipelineState.FAILED,
+        values_enum.JOB_STATE_CANCELLED: PipelineState.CANCELLED,
+        values_enum.JOB_STATE_UPDATED: PipelineState.UPDATED,
+        values_enum.JOB_STATE_DRAINING: PipelineState.DRAINING,
+        values_enum.JOB_STATE_DRAINED: PipelineState.DRAINED,
+    }
+
+    return (api_jobstate_map[self._job.currentState] if self._job.currentState
+            else PipelineState.UNKNOWN)
+
+  def _is_in_terminal_state(self):
+    if not self.has_job:
+      return True
+
+    return self.state in [
+        PipelineState.STOPPED, PipelineState.DONE, PipelineState.FAILED,
+        PipelineState.CANCELLED, PipelineState.DRAINED]
+
+  def wait_until_finish(self, duration=None):
+    if not self._is_in_terminal_state():
+      if not self.has_job:
+        raise IOError('Failed to get the Dataflow job id.')
+      if duration:
+        raise NotImplementedError(
+            'DataflowRunner does not support duration argument.')
+
+      thread = threading.Thread(
+          target=DataflowRunner.poll_for_job_completion,
+          args=(self._runner, self))
+
+      # Mark the thread as a daemon thread so a keyboard interrupt on the main
+      # thread will terminate everything. This is also the reason we will not
+      # use thread.join() to wait for the polling thread.
+      thread.daemon = True
+      thread.start()
+      while thread.isAlive():
+        time.sleep(5.0)
+      if self.state != PipelineState.DONE:
+        # TODO(BEAM-1290): Consider converting this to an error log based on the
+        # resolution of the issue.
+        raise DataflowRuntimeException(
+            'Dataflow pipeline failed. State: %s, Error:\n%s' %
+            (self.state, getattr(self._runner, 'last_error_msg', None)), self)
+    return self.state
+
+  def __str__(self):
+    return '<%s %s %s>' % (
+        self.__class__.__name__,
+        self.job_id(),
+        self.state)
+
+  def __repr__(self):
+    return '<%s %s at %s>' % (self.__class__.__name__, self._job, hex(id(self)))
+
+
+class DataflowRuntimeException(Exception):
+  """Indicates an error has occurred in running this pipeline."""
+
+  def __init__(self, msg, result):
+    super(DataflowRuntimeException, self).__init__(msg)
+    self.result = result

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
new file mode 100644
index 0000000..4a538b1
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the DataflowRunner class."""
+
+import unittest
+
+import mock
+
+from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
+from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException
+from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import base_api
+except ImportError:
+  base_api = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+class DataflowRunnerTest(unittest.TestCase):
+
+  def test_dataflow_runner_has_metrics(self):
+    df_result = DataflowPipelineResult('somejob', 'somerunner')
+    self.assertTrue(df_result.metrics())
+    self.assertTrue(df_result.metrics().query())
+
+  @unittest.skipIf(base_api is None, 'GCP dependencies are not installed')
+  @mock.patch('time.sleep', return_value=None)
+  def test_wait_until_finish(self, patched_time_sleep):
+    values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
+
+    class MockDataflowRunner(object):
+
+      def __init__(self, final_state):
+        self.dataflow_client = mock.MagicMock()
+        self.job = mock.MagicMock()
+        self.job.currentState = values_enum.JOB_STATE_UNKNOWN
+
+        def get_job_side_effect(*args, **kwargs):
+          self.job.currentState = final_state
+          return mock.DEFAULT
+
+        self.dataflow_client.get_job = mock.MagicMock(
+            return_value=self.job, side_effect=get_job_side_effect)
+        self.dataflow_client.list_messages = mock.MagicMock(
+            return_value=([], None))
+
+    with self.assertRaisesRegexp(
+        DataflowRuntimeException, 'Dataflow pipeline failed. State: FAILED'):
+      failed_runner = MockDataflowRunner(values_enum.JOB_STATE_FAILED)
+      failed_result = DataflowPipelineResult(failed_runner.job, failed_runner)
+      failed_result.wait_until_finish()
+
+    succeeded_runner = MockDataflowRunner(values_enum.JOB_STATE_DONE)
+    succeeded_result = DataflowPipelineResult(
+        succeeded_runner.job, succeeded_runner)
+    succeeded_result.wait_until_finish()
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/dataflow/internal/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/__init__.py b/sdks/python/apache_beam/runners/dataflow/internal/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/internal/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
new file mode 100644
index 0000000..0dab676
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -0,0 +1,726 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Dataflow client utility functions."""
+
+import codecs
+import getpass
+import json
+import logging
+import os
+import re
+import time
+from StringIO import StringIO
+from datetime import datetime
+
+from apitools.base.py import encoding
+from apitools.base.py import exceptions
+
+from apache_beam import utils
+from apache_beam.internal.auth import get_service_credentials
+from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.io.gcp.internal.clients import storage
+from apache_beam.runners.dataflow.internal.clients import dataflow
+from apache_beam.transforms import cy_combiners
+from apache_beam.transforms.display import DisplayData
+from apache_beam.utils import dependency
+from apache_beam.utils import retry
+from apache_beam.utils.dependency import get_required_container_version
+from apache_beam.utils.dependency import get_sdk_name_and_version
+from apache_beam.utils.names import PropertyNames
+from apache_beam.utils.pipeline_options import DebugOptions
+from apache_beam.utils.pipeline_options import GoogleCloudOptions
+from apache_beam.utils.pipeline_options import StandardOptions
+from apache_beam.utils.pipeline_options import WorkerOptions
+
+
+class Step(object):
+  """Wrapper for a dataflow Step protobuf."""
+
+  def __init__(self, step_kind, step_name, additional_properties=None):
+    self.step_kind = step_kind
+    self.step_name = step_name
+    self.proto = dataflow.Step(kind=step_kind, name=step_name)
+    self.proto.properties = {}
+    self._additional_properties = []
+
+    if additional_properties is not None:
+      for (n, v, t) in additional_properties:
+        self.add_property(n, v, t)
+
+  def add_property(self, name, value, with_type=False):
+    self._additional_properties.append((name, value, with_type))
+    self.proto.properties.additionalProperties.append(
+        dataflow.Step.PropertiesValue.AdditionalProperty(
+            key=name, value=to_json_value(value, with_type=with_type)))
+
+  def _get_outputs(self):
+    """Returns a list of all output labels for a step."""
+    outputs = []
+    for p in self.proto.properties.additionalProperties:
+      if p.key == PropertyNames.OUTPUT_INFO:
+        for entry in p.value.array_value.entries:
+          for entry_prop in entry.object_value.properties:
+            if entry_prop.key == PropertyNames.OUTPUT_NAME:
+              outputs.append(entry_prop.value.string_value)
+    return outputs
+
+  def __reduce__(self):
+    """Reduce hook for pickling the Step class more easily."""
+    return (Step, (self.step_kind, self.step_name, self._additional_properties))
+
+  def get_output(self, tag=None):
+    """Returns name if it is one of the outputs or first output if name is None.
+
+    Args:
+      tag: tag of the output as a string or None if we want to get the
+        name of the first output.
+
+    Returns:
+      The name of the output associated with the tag or the first output
+      if tag was None.
+
+    Raises:
+      ValueError: if the tag does not exist within outputs.
+    """
+    outputs = self._get_outputs()
+    if tag is None:
+      return outputs[0]
+    else:
+      name = '%s_%s' % (PropertyNames.OUT, tag)
+      if name not in outputs:
+        raise ValueError(
+            'Cannot find named output: %s in %s.' % (name, outputs))
+      return name
+
+
+class Environment(object):
+  """Wrapper for a dataflow Environment protobuf."""
+
+  def __init__(self, packages, options, environment_version):
+    self.standard_options = options.view_as(StandardOptions)
+    self.google_cloud_options = options.view_as(GoogleCloudOptions)
+    self.worker_options = options.view_as(WorkerOptions)
+    self.debug_options = options.view_as(DebugOptions)
+    self.proto = dataflow.Environment()
+    self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE
+    self.proto.dataset = '{}/cloud_dataflow'.format(
+        GoogleCloudOptions.BIGQUERY_API_SERVICE)
+    self.proto.tempStoragePrefix = (
+        self.google_cloud_options.temp_location.replace(
+            'gs:/',
+            GoogleCloudOptions.STORAGE_API_SERVICE))
+    # User agent information.
+    self.proto.userAgent = dataflow.Environment.UserAgentValue()
+    self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
+
+    if self.google_cloud_options.service_account_email:
+      self.proto.serviceAccountEmail = (
+          self.google_cloud_options.service_account_email)
+
+    sdk_name, version_string = get_sdk_name_and_version()
+
+    self.proto.userAgent.additionalProperties.extend([
+        dataflow.Environment.UserAgentValue.AdditionalProperty(
+            key='name',
+            value=to_json_value(sdk_name)),
+        dataflow.Environment.UserAgentValue.AdditionalProperty(
+            key='version', value=to_json_value(version_string))])
+    # Version information.
+    self.proto.version = dataflow.Environment.VersionValue()
+    if self.standard_options.streaming:
+      job_type = 'PYTHON_STREAMING'
+    else:
+      job_type = 'PYTHON_BATCH'
+    self.proto.version.additionalProperties.extend([
+        dataflow.Environment.VersionValue.AdditionalProperty(
+            key='job_type',
+            value=to_json_value(job_type)),
+        dataflow.Environment.VersionValue.AdditionalProperty(
+            key='major', value=to_json_value(environment_version))])
+    # Experiments
+    if self.debug_options.experiments:
+      for experiment in self.debug_options.experiments:
+        self.proto.experiments.append(experiment)
+    # Worker pool(s) information.
+    package_descriptors = []
+    for package in packages:
+      package_descriptors.append(
+          dataflow.Package(
+              location='%s/%s' % (
+                  self.google_cloud_options.staging_location.replace(
+                      'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE),
+                  package),
+              name=package))
+
+    pool = dataflow.WorkerPool(
+        kind='local' if self.local else 'harness',
+        packages=package_descriptors,
+        taskrunnerSettings=dataflow.TaskRunnerSettings(
+            parallelWorkerSettings=dataflow.WorkerSettings(
+                baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
+                servicePath=self.google_cloud_options.dataflow_endpoint)))
+    pool.autoscalingSettings = dataflow.AutoscalingSettings()
+    # Set worker pool options received through command line.
+    if self.worker_options.num_workers:
+      pool.numWorkers = self.worker_options.num_workers
+    if self.worker_options.max_num_workers:
+      pool.autoscalingSettings.maxNumWorkers = (
+          self.worker_options.max_num_workers)
+    if self.worker_options.autoscaling_algorithm:
+      values_enum = dataflow.AutoscalingSettings.AlgorithmValueValuesEnum
+      pool.autoscalingSettings.algorithm = {
+          'NONE': values_enum.AUTOSCALING_ALGORITHM_NONE,
+          'THROUGHPUT_BASED': values_enum.AUTOSCALING_ALGORITHM_BASIC,
+      }.get(self.worker_options.autoscaling_algorithm)
+    if self.worker_options.machine_type:
+      pool.machineType = self.worker_options.machine_type
+    if self.worker_options.disk_size_gb:
+      pool.diskSizeGb = self.worker_options.disk_size_gb
+    if self.worker_options.disk_type:
+      pool.diskType = self.worker_options.disk_type
+    if self.worker_options.zone:
+      pool.zone = self.worker_options.zone
+    if self.worker_options.network:
+      pool.network = self.worker_options.network
+    if self.worker_options.worker_harness_container_image:
+      pool.workerHarnessContainerImage = (
+          self.worker_options.worker_harness_container_image)
+    else:
+      # Default to using the worker harness container image for the current SDK
+      # version.
+      pool.workerHarnessContainerImage = (
+          'dataflow.gcr.io/v1beta3/python:%s' %
+          get_required_container_version())
+    if self.worker_options.use_public_ips is not None:
+      if self.worker_options.use_public_ips:
+        pool.ipConfiguration = (
+            dataflow.WorkerPool
+            .IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
+      else:
+        pool.ipConfiguration = (
+            dataflow.WorkerPool
+            .IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE)
+
+    if self.standard_options.streaming:
+      # Use separate data disk for streaming.
+      disk = dataflow.Disk()
+      if self.local:
+        disk.diskType = 'local'
+      # TODO(ccy): allow customization of disk.
+      pool.dataDisks.append(disk)
+    self.proto.workerPools.append(pool)
+
+    sdk_pipeline_options = options.get_all_options()
+    if sdk_pipeline_options:
+      self.proto.sdkPipelineOptions = (
+          dataflow.Environment.SdkPipelineOptionsValue())
+
+      options_dict = {k: v
+                      for k, v in sdk_pipeline_options.iteritems()
+                      if v is not None}
+      self.proto.sdkPipelineOptions.additionalProperties.append(
+          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
+              key='options', value=to_json_value(options_dict)))
+
+      dd = DisplayData.create_from_options(options)
+      items = [item.get_dict() for item in dd.items]
+      self.proto.sdkPipelineOptions.additionalProperties.append(
+          dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
+              key='display_data', value=to_json_value(items)))
+
+
+class Job(object):
+  """Wrapper for a dataflow Job protobuf."""
+
+  def __str__(self):
+    def encode_shortstrings(input_buffer, errors='strict'):
+      """Encoder (from Unicode) that suppresses long base64 strings."""
+      original_len = len(input_buffer)
+      if original_len > 150:
+        if self.base64_str_re.match(input_buffer):
+          input_buffer = '<string of %d bytes>' % original_len
+          input_buffer = input_buffer.encode('ascii', errors=errors)
+        else:
+          matched = self.coder_str_re.match(input_buffer)
+          if matched:
+            input_buffer = '%s<string of %d bytes>' % (
+                matched.group(1), matched.end(2) - matched.start(2))
+            input_buffer = input_buffer.encode('ascii', errors=errors)
+      return input_buffer, original_len
+
+    def decode_shortstrings(input_buffer, errors='strict'):
+      """Decoder (to Unicode) that suppresses long base64 strings."""
+      shortened, length = encode_shortstrings(input_buffer, errors)
+      return unicode(shortened), length
+
+    def shortstrings_registerer(encoding_name):
+      if encoding_name == 'shortstrings':
+        return codecs.CodecInfo(name='shortstrings',
+                                encode=encode_shortstrings,
+                                decode=decode_shortstrings)
+      return None
+
+    codecs.register(shortstrings_registerer)
+
+    # Use json "dump string" method to get readable formatting;
+    # further modify it to not output too-long strings, aimed at the
+    # 10,000+ character hex-encoded "serialized_fn" values.
+    return json.dumps(
+        json.loads(encoding.MessageToJson(self.proto), encoding='shortstrings'),
+        indent=2, sort_keys=True)
+
+  @staticmethod
+  def default_job_name(job_name):
+    if job_name is None:
+      user_name = getpass.getuser().lower()
+      date_component = datetime.utcnow().strftime('%m%d%H%M%S-%f')
+      app_name = 'beamapp'
+      job_name = '{}-{}-{}'.format(app_name, user_name, date_component)
+    return job_name
+
+  def __init__(self, options):
+    self.options = options
+    self.google_cloud_options = options.view_as(GoogleCloudOptions)
+    if not self.google_cloud_options.job_name:
+      self.google_cloud_options.job_name = self.default_job_name(
+          self.google_cloud_options.job_name)
+
+    required_google_cloud_options = ['project', 'job_name', 'temp_location']
+    missing = [
+        option for option in required_google_cloud_options
+        if not getattr(self.google_cloud_options, option)]
+    if missing:
+      raise ValueError(
+          'Missing required configuration parameters: %s' % missing)
+
+    if not self.google_cloud_options.staging_location:
+      logging.info('Defaulting to the temp_location as staging_location: %s',
+                   self.google_cloud_options.temp_location)
+      (self.google_cloud_options
+       .staging_location) = self.google_cloud_options.temp_location
+
+    # Make the staging and temp locations job name and time specific. This is
+    # needed to avoid clashes between job submissions using the same staging
+    # area or team members using same job names. This method is not entirely
+    # foolproof since two job submissions with same name can happen at exactly
+    # the same time. However the window is extremely small given that
+    # time.time() has at least microseconds granularity. We add the suffix only
+    # for GCS staging locations where the potential for such clashes is high.
+    if self.google_cloud_options.staging_location.startswith('gs://'):
+      path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time())
+      self.google_cloud_options.staging_location = utils.path.join(
+          self.google_cloud_options.staging_location, path_suffix)
+      self.google_cloud_options.temp_location = utils.path.join(
+          self.google_cloud_options.temp_location, path_suffix)
+    self.proto = dataflow.Job(name=self.google_cloud_options.job_name)
+    if self.options.view_as(StandardOptions).streaming:
+      self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING
+    else:
+      self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH
+    self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
+    self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
+
+  def json(self):
+    return encoding.MessageToJson(self.proto)
+
+  def __reduce__(self):
+    """Reduce hook for pickling the Job class more easily."""
+    return (Job, (self.options,))
+
+
+class DataflowApplicationClient(object):
+  """A Dataflow API client used by application code to create and query jobs."""
+
+  def __init__(self, options, environment_version):
+    """Initializes a Dataflow API client object."""
+    self.standard_options = options.view_as(StandardOptions)
+    self.google_cloud_options = options.view_as(GoogleCloudOptions)
+    self.environment_version = environment_version
+    if self.google_cloud_options.no_auth:
+      credentials = None
+    else:
+      credentials = get_service_credentials()
+    self._client = dataflow.DataflowV1b3(
+        url=self.google_cloud_options.dataflow_endpoint,
+        credentials=credentials,
+        get_credentials=(not self.google_cloud_options.no_auth))
+    self._storage_client = storage.StorageV1(
+        url='https://www.googleapis.com/storage/v1',
+        credentials=credentials,
+        get_credentials=(not self.google_cloud_options.no_auth))
+
+  # TODO(silviuc): Refactor so that retry logic can be applied.
+  @retry.no_retries  # Using no_retries marks this as an integration point.
+  def _gcs_file_copy(self, from_path, to_path):
+    to_folder, to_name = os.path.split(to_path)
+    with open(from_path, 'rb') as f:
+      self.stage_file(to_folder, to_name, f)
+
+  def stage_file(self, gcs_or_local_path, file_name, stream,
+                 mime_type='application/octet-stream'):
+    """Stages a file at a GCS or local path with stream-supplied contents."""
+    if not gcs_or_local_path.startswith('gs://'):
+      local_path = os.path.join(gcs_or_local_path, file_name)
+      logging.info('Staging file locally to %s', local_path)
+      with open(local_path, 'wb') as f:
+        f.write(stream.read())
+      return
+    gcs_location = gcs_or_local_path + '/' + file_name
+    bucket, name = gcs_location[5:].split('/', 1)
+
+    request = storage.StorageObjectsInsertRequest(
+        bucket=bucket, name=name)
+    logging.info('Starting GCS upload to %s...', gcs_location)
+    upload = storage.Upload(stream, mime_type)
+    try:
+      response = self._storage_client.objects.Insert(request, upload=upload)
+    except exceptions.HttpError as e:
+      reportable_errors = {
+          403: 'access denied',
+          404: 'bucket not found',
+      }
+      if e.status_code in reportable_errors:
+        raise IOError(('Could not upload to GCS path %s: %s. Please verify '
+                       'that credentials are valid and that you have write '
+                       'access to the specified path. Stale credentials can be '
+                       'refreshed by executing "gcloud auth login".') %
+                      (gcs_or_local_path, reportable_errors[e.status_code]))
+      raise
+    logging.info('Completed GCS upload to %s', gcs_location)
+    return response
+
+  # TODO(silviuc): Refactor so that retry logic can be applied.
+  @retry.no_retries  # Using no_retries marks this as an integration point.
+  def create_job(self, job):
+    """Creates job description. May stage and/or submit for remote execution."""
+    self.create_job_description(job)
+
+    # Stage and submit the job when necessary
+    dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file
+    template_location = (
+        job.options.view_as(GoogleCloudOptions).template_location)
+
+    job_location = template_location or dataflow_job_file
+    if job_location:
+      gcs_or_local_path = os.path.dirname(job_location)
+      file_name = os.path.basename(job_location)
+      self.stage_file(gcs_or_local_path, file_name, StringIO(job.json()))
+
+    if not template_location:
+      return self.submit_job_description(job)
+    else:
+      return None
+
+  def create_job_description(self, job):
+    """Creates a job described by the workflow proto."""
+    resources = dependency.stage_job_resources(
+        job.options, file_copy=self._gcs_file_copy)
+    job.proto.environment = Environment(
+        packages=resources, options=job.options,
+        environment_version=self.environment_version).proto
+    # TODO(silviuc): Remove the debug logging eventually.
+    logging.info('JOB: %s', job)
+
+  def submit_job_description(self, job):
+    """Creates and excutes a job request."""
+    request = dataflow.DataflowProjectsJobsCreateRequest()
+    request.projectId = self.google_cloud_options.project
+    request.job = job.proto
+
+    try:
+      response = self._client.projects_jobs.Create(request)
+    except exceptions.BadStatusCodeError as e:
+      logging.error('HTTP status %d trying to create job'
+                    ' at dataflow service endpoint %s',
+                    e.response.status,
+                    self.google_cloud_options.dataflow_endpoint)
+      logging.fatal('details of server error: %s', e)
+      raise
+    logging.info('Create job: %s', response)
+    # The response is a Job proto with the id for the new job.
+    logging.info('Created job with id: [%s]', response.id)
+    logging.info(
+        'To access the Dataflow monitoring console, please navigate to '
+        'https://console.developers.google.com/project/%s/dataflow/job/%s',
+        self.google_cloud_options.project, response.id)
+
+    return response
+
+  @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
+  def modify_job_state(self, job_id, new_state):
+    """Modify the run state of the job.
+
+    Args:
+      job_id: The id of the job.
+      new_state: A string representing the new desired state. It could be set to
+      either 'JOB_STATE_DONE', 'JOB_STATE_CANCELLED' or 'JOB_STATE_DRAINING'.
+
+    Returns:
+      True if the job was modified successfully.
+    """
+    if new_state == 'JOB_STATE_DONE':
+      new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DONE
+    elif new_state == 'JOB_STATE_CANCELLED':
+      new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_CANCELLED
+    elif new_state == 'JOB_STATE_DRAINING':
+      new_state = dataflow.Job.RequestedStateValueValuesEnum.JOB_STATE_DRAINING
+    else:
+      # Other states could only be set by the service.
+      return False
+
+    request = dataflow.DataflowProjectsJobsUpdateRequest()
+    request.jobId = job_id
+    request.projectId = self.google_cloud_options.project
+    request.job = dataflow.Job(requestedState=new_state)
+
+    self._client.projects_jobs.Update(request)
+    return True
+
+  @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
+  def get_job(self, job_id):
+    """Gets the job status for a submitted job.
+
+    Args:
+      job_id: A string representing the job_id for the workflow as returned
+        by the a create_job() request.
+
+    Returns:
+      A Job proto. See below for interesting fields.
+
+    The Job proto returned from a get_job() request contains some interesting
+    fields:
+      currentState: An object representing the current state of the job. The
+        string representation of the object (str() result) has the following
+        possible values: JOB_STATE_UNKNONW, JOB_STATE_STOPPED,
+        JOB_STATE_RUNNING, JOB_STATE_DONE, JOB_STATE_FAILED,
+        JOB_STATE_CANCELLED.
+      createTime: UTC time when the job was created
+        (e.g. '2015-03-10T00:01:53.074Z')
+      currentStateTime: UTC time for the current state of the job.
+    """
+    request = dataflow.DataflowProjectsJobsGetRequest()
+    request.jobId = job_id
+    request.projectId = self.google_cloud_options.project
+    response = self._client.projects_jobs.Get(request)
+    return response
+
+  @retry.with_exponential_backoff()  # Using retry defaults from utils/retry.py
+  def list_messages(
+      self, job_id, start_time=None, end_time=None, page_token=None,
+      minimum_importance=None):
+    """List messages associated with the execution of a job.
+
+    Args:
+      job_id: A string representing the job_id for the workflow as returned
+        by the a create_job() request.
+      start_time: If specified, only messages generated after the start time
+        will be returned, otherwise all messages since job started will be
+        returned. The value is a string representing UTC time
+        (e.g., '2015-08-18T21:03:50.644Z')
+      end_time: If specified, only messages generated before the end time
+        will be returned, otherwise all messages up to current time will be
+        returned. The value is a string representing UTC time
+        (e.g., '2015-08-18T21:03:50.644Z')
+      page_token: A string to be used as next page token if the list call
+        returned paginated results.
+      minimum_importance: Filter for messages based on importance. The possible
+        string values in increasing order of importance are: JOB_MESSAGE_DEBUG,
+        JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC, JOB_MESSAGE_WARNING,
+        JOB_MESSAGE_ERROR. For example, a filter set on warning will allow only
+        warnings and errors and exclude all others.
+
+    Returns:
+      A tuple consisting of a list of JobMessage instances and a
+      next page token string.
+
+    Raises:
+      RuntimeError: if an unexpected value for the message_importance argument
+        is used.
+
+    The JobMessage objects returned by the call contain the following  fields:
+      id: A unique string identifier for the message.
+      time: A string representing the UTC time of the message
+        (e.g., '2015-08-18T21:03:50.644Z')
+      messageImportance: An enumeration value for the message importance. The
+        value if converted to string will have the following possible values:
+        JOB_MESSAGE_DEBUG, JOB_MESSAGE_DETAILED, JOB_MESSAGE_BASIC,
+        JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR.
+     messageText: A message string.
+    """
+    request = dataflow.DataflowProjectsJobsMessagesListRequest(
+        jobId=job_id, projectId=self.google_cloud_options.project)
+    if page_token is not None:
+      request.pageToken = page_token
+    if start_time is not None:
+      request.startTime = start_time
+    if end_time is not None:
+      request.endTime = end_time
+    if minimum_importance is not None:
+      if minimum_importance == 'JOB_MESSAGE_DEBUG':
+        request.minimumImportance = (
+            dataflow.DataflowProjectsJobsMessagesListRequest
+            .MinimumImportanceValueValuesEnum
+            .JOB_MESSAGE_DEBUG)
+      elif minimum_importance == 'JOB_MESSAGE_DETAILED':
+        request.minimumImportance = (
+            dataflow.DataflowProjectsJobsMessagesListRequest
+            .MinimumImportanceValueValuesEnum
+            .JOB_MESSAGE_DETAILED)
+      elif minimum_importance == 'JOB_MESSAGE_BASIC':
+        request.minimumImportance = (
+            dataflow.DataflowProjectsJobsMessagesListRequest
+            .MinimumImportanceValueValuesEnum
+            .JOB_MESSAGE_BASIC)
+      elif minimum_importance == 'JOB_MESSAGE_WARNING':
+        request.minimumImportance = (
+            dataflow.DataflowProjectsJobsMessagesListRequest
+            .MinimumImportanceValueValuesEnum
+            .JOB_MESSAGE_WARNING)
+      elif minimum_importance == 'JOB_MESSAGE_ERROR':
+        request.minimumImportance = (
+            dataflow.DataflowProjectsJobsMessagesListRequest
+            .MinimumImportanceValueValuesEnum
+            .JOB_MESSAGE_ERROR)
+      else:
+        raise RuntimeError(
+            'Unexpected value for minimum_importance argument: %r',
+            minimum_importance)
+    response = self._client.projects_jobs_messages.List(request)
+    return response.jobMessages, response.nextPageToken
+
+
+class MetricUpdateTranslators(object):
+  """Translators between accumulators and dataflow metric updates."""
+
+  @staticmethod
+  def translate_boolean(accumulator, metric_update_proto):
+    metric_update_proto.boolean = accumulator.value
+
+  @staticmethod
+  def translate_scalar_mean_int(accumulator, metric_update_proto):
+    if accumulator.count:
+      metric_update_proto.integerMean = dataflow.IntegerMean()
+      metric_update_proto.integerMean.sum = to_split_int(accumulator.sum)
+      metric_update_proto.integerMean.count = to_split_int(accumulator.count)
+    else:
+      metric_update_proto.nameAndKind.kind = None
+
+  @staticmethod
+  def translate_scalar_mean_float(accumulator, metric_update_proto):
+    if accumulator.count:
+      metric_update_proto.floatingPointMean = dataflow.FloatingPointMean()
+      metric_update_proto.floatingPointMean.sum = accumulator.sum
+      metric_update_proto.floatingPointMean.count = to_split_int(
+          accumulator.count)
+    else:
+      metric_update_proto.nameAndKind.kind = None
+
+  @staticmethod
+  def translate_scalar_counter_int(accumulator, metric_update_proto):
+    metric_update_proto.integer = to_split_int(accumulator.value)
+
+  @staticmethod
+  def translate_scalar_counter_float(accumulator, metric_update_proto):
+    metric_update_proto.floatingPoint = accumulator.value
+
+
+def to_split_int(n):
+  res = dataflow.SplitInt64()
+  res.lowBits = n & 0xffffffff
+  res.highBits = n >> 32
+  return res
+
+
+def translate_distribution(distribution_update, metric_update_proto):
+  """Translate metrics DistributionUpdate to dataflow distribution update."""
+  dist_update_proto = dataflow.DistributionUpdate()
+  dist_update_proto.min = to_split_int(distribution_update.min)
+  dist_update_proto.max = to_split_int(distribution_update.max)
+  dist_update_proto.count = to_split_int(distribution_update.count)
+  dist_update_proto.sum = to_split_int(distribution_update.sum)
+  metric_update_proto.distribution = dist_update_proto
+
+
+def translate_value(value, metric_update_proto):
+  metric_update_proto.integer = to_split_int(value)
+
+
+def translate_scalar(accumulator, metric_update):
+  metric_update.scalar = to_json_value(accumulator.value, with_type=True)
+
+
+def translate_mean(accumulator, metric_update):
+  if accumulator.count:
+    metric_update.meanSum = to_json_value(accumulator.sum, with_type=True)
+    metric_update.meanCount = to_json_value(accumulator.count, with_type=True)
+  else:
+    # A denominator of 0 will raise an error in the service.
+    # What it means is we have nothing to report yet, so don't.
+    metric_update.kind = None
+
+
+# To enable a counter on the service, add it to this dictionary.
+metric_translations = {
+    cy_combiners.CountCombineFn: ('sum', translate_scalar),
+    cy_combiners.SumInt64Fn: ('sum', translate_scalar),
+    cy_combiners.MinInt64Fn: ('min', translate_scalar),
+    cy_combiners.MaxInt64Fn: ('max', translate_scalar),
+    cy_combiners.MeanInt64Fn: ('mean', translate_mean),
+    cy_combiners.SumFloatFn: ('sum', translate_scalar),
+    cy_combiners.MinFloatFn: ('min', translate_scalar),
+    cy_combiners.MaxFloatFn: ('max', translate_scalar),
+    cy_combiners.MeanFloatFn: ('mean', translate_mean),
+    cy_combiners.AllCombineFn: ('and', translate_scalar),
+    cy_combiners.AnyCombineFn: ('or', translate_scalar),
+}
+
+counter_translations = {
+    cy_combiners.CountCombineFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.SumInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MinInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MIN,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MaxInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MAX,
+        MetricUpdateTranslators.translate_scalar_counter_int),
+    cy_combiners.MeanInt64Fn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MEAN,
+        MetricUpdateTranslators.translate_scalar_mean_int),
+    cy_combiners.SumFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.SUM,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MinFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MIN,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MaxFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MAX,
+        MetricUpdateTranslators.translate_scalar_counter_float),
+    cy_combiners.MeanFloatFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.MEAN,
+        MetricUpdateTranslators.translate_scalar_mean_float),
+    cy_combiners.AllCombineFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.AND,
+        MetricUpdateTranslators.translate_boolean),
+    cy_combiners.AnyCombineFn: (
+        dataflow.NameAndKind.KindValueValuesEnum.OR,
+        MetricUpdateTranslators.translate_boolean),
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
new file mode 100644
index 0000000..d60c7a5
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -0,0 +1,96 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Unit tests for the apiclient module."""
+
+import unittest
+
+from mock import Mock
+
+from apache_beam.metrics.cells import DistributionData
+from apache_beam.utils.pipeline_options import PipelineOptions
+
+from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
+from apache_beam.runners.dataflow.internal.clients import dataflow
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.runners.dataflow.internal import apiclient
+except ImportError:
+  apiclient = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+@unittest.skipIf(apiclient is None, 'GCP dependencies are not installed')
+class UtilTest(unittest.TestCase):
+
+  @unittest.skip("Enable once BEAM-1080 is fixed.")
+  def test_create_application_client(self):
+    pipeline_options = PipelineOptions()
+    apiclient.DataflowApplicationClient(
+        pipeline_options,
+        DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
+
+  def test_default_job_name(self):
+    job_name = apiclient.Job.default_job_name(None)
+    regexp = 'beamapp-.*-[0-9]{10}-[0-9]{6}'
+    self.assertRegexpMatches(job_name, regexp)
+
+  def test_split_int(self):
+    number = 12345
+    split_number = apiclient.to_split_int(number)
+    self.assertEqual((split_number.lowBits, split_number.highBits),
+                     (number, 0))
+    shift_number = number << 32
+    split_number = apiclient.to_split_int(shift_number)
+    self.assertEqual((split_number.lowBits, split_number.highBits),
+                     (0, number))
+
+  def test_translate_distribution(self):
+    metric_update = dataflow.CounterUpdate()
+    distribution_update = DistributionData(16, 2, 1, 15)
+    apiclient.translate_distribution(distribution_update, metric_update)
+    self.assertEqual(metric_update.distribution.min.lowBits,
+                     distribution_update.min)
+    self.assertEqual(metric_update.distribution.max.lowBits,
+                     distribution_update.max)
+    self.assertEqual(metric_update.distribution.sum.lowBits,
+                     distribution_update.sum)
+    self.assertEqual(metric_update.distribution.count.lowBits,
+                     distribution_update.count)
+
+  def test_translate_means(self):
+    metric_update = dataflow.CounterUpdate()
+    accumulator = Mock()
+    accumulator.sum = 16
+    accumulator.count = 2
+    apiclient.MetricUpdateTranslators.translate_scalar_mean_int(accumulator,
+                                                                metric_update)
+    self.assertEqual(metric_update.integerMean.sum.lowBits, accumulator.sum)
+    self.assertEqual(metric_update.integerMean.count.lowBits, accumulator.count)
+
+    accumulator.sum = 16.0
+    accumulator.count = 2
+    apiclient.MetricUpdateTranslators.translate_scalar_mean_float(accumulator,
+                                                                  metric_update)
+    self.assertEqual(metric_update.floatingPointMean.sum, accumulator.sum)
+    self.assertEqual(
+        metric_update.floatingPointMean.count.lowBits, accumulator.count)
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py
new file mode 100644
index 0000000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/beam/blob/59ad58ac/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py
new file mode 100644
index 0000000..c0d20c3
--- /dev/null
+++ b/sdks/python/apache_beam/runners/dataflow/internal/clients/dataflow/__init__.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Common imports for generated dataflow client library."""
+# pylint:disable=wildcard-import
+
+import pkgutil
+
+# Protect against environments where apitools library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apitools.base.py import *
+  from apache_beam.runners.dataflow.internal.clients.dataflow.dataflow_v1b3_messages import *
+  from apache_beam.runners.dataflow.internal.clients.dataflow.dataflow_v1b3_client import *
+except ImportError:
+  pass
+# pylint: enable=wrong-import-order, wrong-import-position
+
+__path__ = pkgutil.extend_path(__path__, __name__)


Mime
View raw message