beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [01/50] [abbrv] beam git commit: [BEAM-1964] Fix lint issues for linter upgrade -2
Date Tue, 25 Apr 2017 17:29:56 GMT
Repository: beam
Updated Branches:
  refs/heads/jstorm-runner f6a89b0fc -> 58d4b97c0


[BEAM-1964] Fix lint issues for linter upgrade -2


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf474a0b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf474a0b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf474a0b

Branch: refs/heads/jstorm-runner
Commit: bf474a0b72beb2e946be39ce04e3f07800a3b307
Parents: cf9ac45
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Thu Apr 13 17:19:56 2017 -0700
Committer: Ahmet Altay <altay@altay-macbookpro2.roam.corp.google.com>
Committed: Fri Apr 14 13:06:14 2017 -0700

----------------------------------------------------------------------
 .../io/gcp/datastore/v1/datastoreio.py          |  4 +--
 .../apache_beam/io/gcp/datastore/v1/helper.py   | 16 ++++-------
 .../io/gcp/datastore/v1/query_splitter.py       |  2 +-
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py |  3 +-
 .../io/gcp/tests/bigquery_matcher.py            |  3 +-
 sdks/python/apache_beam/metrics/cells.py        | 28 +++++++++----------
 sdks/python/apache_beam/metrics/execution.py    |  3 +-
 sdks/python/apache_beam/metrics/metric.py       |  9 ++----
 sdks/python/apache_beam/runners/common.py       |  9 ++----
 .../runners/dataflow/dataflow_metrics_test.py   |  3 +-
 .../runners/dataflow/dataflow_runner.py         |  6 ++--
 .../runners/dataflow/internal/apiclient.py      |  8 +++---
 .../runners/dataflow/internal/dependency.py     |  6 ++--
 .../runners/dataflow/test_dataflow_runner.py    |  4 ---
 .../runners/direct/bundle_factory.py            | 14 ++++------
 .../runners/direct/evaluation_context.py        | 10 +++----
 .../apache_beam/runners/direct/executor.py      |  9 +-----
 .../runners/direct/transform_evaluator.py       |  7 -----
 sdks/python/apache_beam/runners/runner.py       |  3 +-
 .../apache_beam/tests/pipeline_verifiers.py     |  7 ++---
 sdks/python/apache_beam/transforms/combiners.py | 29 +++++++-------------
 .../apache_beam/transforms/combiners_test.py    |  4 +--
 sdks/python/apache_beam/typehints/decorators.py |  3 +-
 sdks/python/apache_beam/typehints/typehints.py  |  3 +-
 24 files changed, 68 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index e8ca05d..d9b3598 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -253,7 +253,7 @@ class ReadFromDatastore(PTransform):
     query = helper.make_latest_timestamp_query(namespace)
     req = helper.make_request(project, namespace, query)
     resp = datastore.run_query(req)
-    if len(resp.batch.entity_results) == 0:
+    if not resp.batch.entity_results:
       raise RuntimeError("Datastore total statistics unavailable.")
 
     entity = resp.batch.entity_results[0].entity
@@ -281,7 +281,7 @@ class ReadFromDatastore(PTransform):
 
     req = helper.make_request(project, namespace, kind_stats_query)
     resp = datastore.run_query(req)
-    if len(resp.batch.entity_results) == 0:
+    if not resp.batch.entity_results:
       raise RuntimeError("Datastore statistics for kind %s unavailable" % kind)
 
     entity = resp.batch.entity_results[0].entity

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index b1ef9af..d544226 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -62,8 +62,7 @@ def key_comparator(k1, k2):
   k2_path = next(k2_iter, None)
   if k2_path:
     return -1
-  else:
-    return 0
+  return 0
 
 
 def compare_path(p1, p2):
@@ -99,8 +98,7 @@ def str_compare(s1, s2):
     return 0
   elif s1 < s2:
     return -1
-  else:
-    return 1
+  return 1
 
 
 def get_datastore(project):
@@ -131,13 +129,9 @@ def make_partition(project, namespace):
 def retry_on_rpc_error(exception):
   """A retry filter for Cloud Datastore RPCErrors."""
   if isinstance(exception, RPCError):
-    if exception.code >= 500:
-      return True
-    else:
-      return False
-  else:
-    # TODO(vikasrk): Figure out what other errors should be retried.
-    return False
+    return exception.code >= 500
+  # TODO(vikasrk): Figure out what other errors should be retried.
+  return False
 
 
 def fetch_entities(project, namespace, query, datastore):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
index 8ced170..d5674f9 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/query_splitter.py
@@ -97,7 +97,7 @@ def _validate_query(query):
   if len(query.kind) != 1:
     raise ValueError('Query must have exactly one kind.')
 
-  if len(query.order) != 0:
+  if query.order:
     raise ValueError('Query cannot have any sort orders.')
 
   if query.HasField('limit'):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index b2bc809..a10a3d2 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -93,8 +93,7 @@ class GCSFileSystem(FileSystem):
     raw_file = gcsio.GcsIO().open(path, mode, mime_type=mime_type)
     if compression_type == CompressionTypes.UNCOMPRESSED:
       return raw_file
-    else:
-      return CompressedFile(raw_file, compression_type=compression_type)
+    return CompressedFile(raw_file, compression_type=compression_type)
 
   def create(self, path, mime_type='application/octet-stream',
              compression_type=CompressionTypes.AUTO):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
index cc26689..66d99b3 100644
--- a/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
+++ b/sdks/python/apache_beam/io/gcp/tests/bigquery_matcher.py
@@ -38,8 +38,7 @@ MAX_RETRIES = 4
 
 def retry_on_http_and_value_error(exception):
   """Filter allowing retries on Bigquery errors and value error."""
-  return isinstance(exception, GoogleCloudError) or \
-          isinstance(exception, ValueError)
+  return isinstance(exception, (GoogleCloudError, ValueError))
 
 
 class BigqueryMatcher(BaseMatcher):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/cells.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/cells.py b/sdks/python/apache_beam/metrics/cells.py
index 5a571f5..c421949 100644
--- a/sdks/python/apache_beam/metrics/cells.py
+++ b/sdks/python/apache_beam/metrics/cells.py
@@ -97,9 +97,8 @@ class CellCommitState(object):
     with self._lock:
       if self._state == CellCommitState.CLEAN:
         return False
-      else:
-        self._state = CellCommitState.COMMITTING
-        return True
+      self._state = CellCommitState.COMMITTING
+      return True
 
 
 class MetricCell(object):
@@ -218,8 +217,7 @@ class DistributionResult(object):
     """
     if self.data.count == 0:
       return None
-    else:
-      return float(self.data.sum)/self.data.count
+    return float(self.data.sum)/self.data.count
 
 
 class DistributionData(object):
@@ -257,16 +255,16 @@ class DistributionData(object):
   def combine(self, other):
     if other is None:
       return self
-    else:
-      new_min = (None if self.min is None and other.min is None else
-                 min(x for x in (self.min, other.min) if x is not None))
-      new_max = (None if self.max is None and other.max is None else
-                 max(x for x in (self.max, other.max) if x is not None))
-      return DistributionData(
-          self.sum + other.sum,
-          self.count + other.count,
-          new_min,
-          new_max)
+
+    new_min = (None if self.min is None and other.min is None else
+               min(x for x in (self.min, other.min) if x is not None))
+    new_max = (None if self.max is None and other.max is None else
+               max(x for x in (self.max, other.max) if x is not None))
+    return DistributionData(
+        self.sum + other.sum,
+        self.count + other.count,
+        new_min,
+        new_max)
 
   @classmethod
   def singleton(cls, value):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/execution.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/execution.py b/sdks/python/apache_beam/metrics/execution.py
index f6c8990..887423b 100644
--- a/sdks/python/apache_beam/metrics/execution.py
+++ b/sdks/python/apache_beam/metrics/execution.py
@@ -129,8 +129,7 @@ class _MetricsEnvironment(object):
     index = len(self.PER_THREAD.container) - 1
     if index < 0:
       return None
-    else:
-      return self.PER_THREAD.container[index]
+    return self.PER_THREAD.container[index]
 
   def set_current_container(self, container):
     self.set_container_stack()

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/metrics/metric.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py
index f6a0923..33db4e1 100644
--- a/sdks/python/apache_beam/metrics/metric.py
+++ b/sdks/python/apache_beam/metrics/metric.py
@@ -103,8 +103,7 @@ class MetricResults(object):
         (filter.names and
          metric_key.metric.name in filter.names)):
       return True
-    else:
-      return False
+    return False
 
   @staticmethod
   def _matches_sub_path(actual_scope, filter_scope):
@@ -117,8 +116,7 @@ class MetricResults(object):
       return False  # The first entry was not exactly matched
     elif end_pos != len(actual_scope) and actual_scope[end_pos] != '/':
       return False  # The last entry was not exactly matched
-    else:
-      return True
+    return True
 
   @staticmethod
   def _matches_scope(filter, metric_key):
@@ -139,8 +137,7 @@ class MetricResults(object):
     if (MetricResults._matches_name(filter, metric_key) and
         MetricResults._matches_scope(filter, metric_key)):
       return True
-    else:
-      return False
+    return False
 
   def query(self, filter=None):
     raise NotImplementedError

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 2c1032d..8f86b75 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -414,10 +414,8 @@ def get_logging_context(maybe_logger, **kwargs):
     maybe_context = maybe_logger.PerThreadLoggingContext(**kwargs)
     if isinstance(maybe_context, LoggingContext):
       return maybe_context
-    else:
-      return _LoggingContextAdapter(maybe_context)
-  else:
-    return LoggingContext()
+    return _LoggingContextAdapter(maybe_context)
+  return LoggingContext()
 
 
 class _ReceiverAdapter(Receiver):
@@ -432,5 +430,4 @@ class _ReceiverAdapter(Receiver):
 def as_receiver(maybe_receiver):
   if isinstance(maybe_receiver, Receiver):
     return maybe_receiver
-  else:
-    return _ReceiverAdapter(maybe_receiver)
+  return _ReceiverAdapter(maybe_receiver)

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
index 95027a3..ffee3e5 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics_test.py
@@ -38,8 +38,7 @@ class DictToObject(object):
   def _wrap(self, value):
     if isinstance(value, (tuple, list, set, frozenset)):
       return type(value)([self._wrap(v) for v in value])
-    else:
-      return DictToObject(value) if isinstance(value, dict) else value
+    return DictToObject(value) if isinstance(value, dict) else value
 
 
 class TestDataflowMetrics(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1a92c26..2e9fc52 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -92,8 +92,7 @@ class DataflowRunner(PipelineRunner):
         return -1
       elif 'Traceback' in msg:
         return 1
-      else:
-        return 0
+      return 0
 
     job_id = result.job_id()
     while True:
@@ -194,8 +193,7 @@ class DataflowRunner(PipelineRunner):
       return coders.WindowedValueCoder(
           coders.registry.get_coder(typehint),
           window_coder=window_coder)
-    else:
-      return coders.registry.get_coder(typehint)
+    return coders.registry.get_coder(typehint)
 
   def _get_cloud_encoding(self, coder):
     """Returns an encoding based on a coder object."""

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 6a8aa93..8d44dff 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -436,10 +436,10 @@ class DataflowApplicationClient(object):
 
     if not template_location:
       return self.submit_job_description(job)
-    else:
-      logging.info('A template was just created at location %s',
-                   template_location)
-      return None
+
+    logging.info('A template was just created at location %s',
+                 template_location)
+    return None
 
   def create_job_description(self, job):
     """Creates a job described by the workflow proto."""

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
index 22de5c6..1f28b26 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py
@@ -493,8 +493,7 @@ def get_sdk_name_and_version():
   container_version = get_required_container_version()
   if container_version == BEAM_CONTAINER_VERSION:
     return ('Apache Beam SDK for Python', beam_version.__version__)
-  else:
-    return ('Google Cloud Dataflow SDK for Python', container_version)
+  return ('Google Cloud Dataflow SDK for Python', container_version)
 
 
 def get_sdk_package_name():
@@ -502,8 +501,7 @@ def get_sdk_package_name():
   container_version = get_required_container_version()
   if container_version == BEAM_CONTAINER_VERSION:
     return BEAM_PACKAGE_NAME
-  else:
-    return GOOGLE_PACKAGE_NAME
+  return GOOGLE_PACKAGE_NAME
 
 
 def _download_pypi_sdk_package(temp_dir):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index 046313a..4cf4131 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -23,10 +23,6 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
 
 
 class TestDataflowRunner(DataflowRunner):
-
-  def __init__(self):
-    super(TestDataflowRunner, self).__init__()
-
   def run(self, pipeline):
     """Execute test pipeline and verify test matcher"""
     options = pipeline.options.view_as(TestOptions)

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/bundle_factory.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 647b5f2..42c8095 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -127,8 +127,7 @@ class Bundle(object):
     if not self._stacked:
       if self._committed and not make_copy:
         return self._elements
-      else:
-        return list(self._elements)
+      return list(self._elements)
 
     def iterable_stacked_or_elements(elements):
       for e in elements:
@@ -140,9 +139,8 @@ class Bundle(object):
 
     if self._committed and not make_copy:
       return iterable_stacked_or_elements(self._elements)
-    else:
-      # returns a copy.
-      return [e for e in iterable_stacked_or_elements(self._elements)]
+    # returns a copy.
+    return [e for e in iterable_stacked_or_elements(self._elements)]
 
   def has_elements(self):
     return len(self._elements) > 0
@@ -171,9 +169,9 @@ class Bundle(object):
     if not self._stacked:
       self._elements.append(element)
       return
-    if (len(self._elements) > 0 and
-        (isinstance(self._elements[-1], WindowedValue) or
-         isinstance(self._elements[-1], Bundle.StackedWindowedValues)) and
+    if (self._elements and
+        (isinstance(self._elements[-1], (WindowedValue,
+                                         Bundle.StackedWindowedValues))) and
         self._elements[-1].timestamp == element.timestamp and
         self._elements[-1].windows == element.windows):
       if isinstance(self._elements[-1], WindowedValue):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/evaluation_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 8114104..2169c7c 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -281,11 +281,11 @@ class EvaluationContext(object):
     """
     if transform:
       return self._is_transform_done(transform)
-    else:
-      for applied_ptransform in self._step_names:
-        if not self._is_transform_done(applied_ptransform):
-          return False
-      return True
+
+    for applied_ptransform in self._step_names:
+      if not self._is_transform_done(applied_ptransform):
+        return False
+    return True
 
   def _is_transform_done(self, transform):
     tw = self._watermark_manager.get_watermarks(transform)

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index ce6356c..f6a1d7f 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -240,13 +240,6 @@ class _CompletionCallback(object):
         _ExecutorServiceParallelExecutor.ExecutorUpdate(None, exception))
 
 
-class _TimerCompletionCallback(_CompletionCallback):
-
-  def __init__(self, evaluation_context, all_updates, timers):
-    super(_TimerCompletionCallback, self).__init__(
-        evaluation_context, all_updates, timers)
-
-
 class TransformExecutor(ExecutorService.CallableTask):
   """TransformExecutor will evaluate a bundle using an applied ptransform.
 
@@ -529,7 +522,7 @@ class _ExecutorServiceParallelExecutor(object):
         empty_bundle = (
             self._executor.evaluation_context.create_empty_committed_bundle(
                 applied_ptransform.inputs[0]))
-        timer_completion_callback = _TimerCompletionCallback(
+        timer_completion_callback = _CompletionCallback(
             self._executor.evaluation_context, self._executor.all_updates,
             applied_ptransform)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 662c61d..f34513c 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -278,13 +278,6 @@ class _TaggedReceivers(dict):
 
 class _ParDoEvaluator(_TransformEvaluator):
   """TransformEvaluator for ParDo transform."""
-
-  def __init__(self, evaluation_context, applied_ptransform,
-               input_committed_bundle, side_inputs, scoped_metrics_container):
-    super(_ParDoEvaluator, self).__init__(
-        evaluation_context, applied_ptransform, input_committed_bundle,
-        side_inputs, scoped_metrics_container)
-
   def start_bundle(self):
     transform = self._applied_ptransform.transform
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 7e7ec24..6c05951 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -111,8 +111,7 @@ def group_by_key_input_visitor():
       # pylint: disable=wrong-import-order, wrong-import-position
       from apache_beam import GroupByKey, GroupByKeyOnly
       from apache_beam import typehints
-      if (isinstance(transform_node.transform, GroupByKey) or
-          isinstance(transform_node.transform, GroupByKeyOnly)):
+      if isinstance(transform_node.transform, (GroupByKey, GroupByKeyOnly)):
         pcoll = transform_node.inputs[0]
         input_type = pcoll.element_type
         # If input_type is not specified, then treat it as `Any`.

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 3cac658..51302b0 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -66,11 +66,8 @@ class PipelineStateMatcher(BaseMatcher):
 
 def retry_on_io_error_and_server_error(exception):
   """Filter allowing retries on file I/O errors and service error."""
-  if isinstance(exception, IOError) or \
-          (HttpError is not None and isinstance(exception, HttpError)):
-    return True
-  else:
-    return False
+  return isinstance(exception, IOError) or \
+          (HttpError is not None and isinstance(exception, HttpError))
 
 
 class FileChecksumMatcher(BaseMatcher):

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index f55d46a..a4cd462 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -95,8 +95,7 @@ class MeanCombineFn(core.CombineFn):
       return cy_combiners.MeanInt64Fn()
     elif input_type is float:
       return cy_combiners.MeanFloatFn()
-    else:
-      return self
+    return self
 
 
 class Count(object):
@@ -310,23 +309,19 @@ class TopCombineFn(core.CombineFn):
     if len(buffer) < self._n:
       if not buffer:
         return element_key, [element]
-      else:
-        buffer.append(element)
-        if lt(element_key, threshold):  # element_key < threshold
-          return element_key, buffer
-        else:
-          return accumulator  # with mutated buffer
+      buffer.append(element)
+      if lt(element_key, threshold):  # element_key < threshold
+        return element_key, buffer
+      return accumulator  # with mutated buffer
     elif lt(threshold, element_key):  # threshold < element_key
       buffer.append(element)
       if len(buffer) < self._buffer_size:
         return accumulator
-      else:
-        self._sort_buffer(buffer, lt)
-        min_element = buffer[-self._n]
-        threshold = self._key_fn(min_element) if self._key_fn else min_element
-        return threshold, buffer[-self._n:]
-    else:
-      return accumulator
+      self._sort_buffer(buffer, lt)
+      min_element = buffer[-self._n]
+      threshold = self._key_fn(min_element) if self._key_fn else min_element
+      return threshold, buffer[-self._n:]
+    return accumulator
 
   def merge_accumulators(self, accumulators, *args, **kwargs):
     accumulators = list(accumulators)
@@ -357,10 +352,6 @@ class TopCombineFn(core.CombineFn):
 
 
 class Largest(TopCombineFn):
-
-  def __init__(self, n):
-    super(Largest, self).__init__(n)
-
   def default_label(self):
     return 'Largest(%s)' % self._n
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py
index 6c101fe..af76889 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -164,10 +164,10 @@ class CombineTest(unittest.TestCase):
             DisplayDataItemMatcher('fn', sampleFn.fn.__name__),
             DisplayDataItemMatcher('combine_fn',
                                    transform.fn.__class__)]
-        if len(args) > 0:
+        if args:
           expected_items.append(
               DisplayDataItemMatcher('args', str(args)))
-        if len(kwargs) > 0:
+        if kwargs:
           expected_items.append(
               DisplayDataItemMatcher('kwargs', str(kwargs)))
         hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index d8f0b1b..af6c499 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -237,8 +237,7 @@ def _unpack_positional_arg_hints(arg, hint):
     if isinstance(hint, typehints.TupleConstraint):
       return tuple(_unpack_positional_arg_hints(a, t)
                    for a, t in zip(arg, hint.tuple_types))
-    else:
-      return (typehints.Any,) * len(arg)
+    return (typehints.Any,) * len(arg)
   return hint
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bf474a0b/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index 1557d85..9b41adb 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -1039,8 +1039,7 @@ def is_consistent_with(sub, base):
   if isinstance(base, TypeConstraint):
     if isinstance(sub, UnionConstraint):
       return all(is_consistent_with(c, base) for c in sub.union_types)
-    else:
-      return base._consistent_with_check_(sub)
+    return base._consistent_with_check_(sub)
   elif isinstance(sub, TypeConstraint):
     # Nothing but object lives above any type constraints.
     return base == object


Mime
View raw message