beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/3] beam git commit: [BEAM-1964] Fix lint issues for linter upgrade -3
Date Fri, 14 Apr 2017 20:06:50 GMT
Repository: beam
Updated Branches:
  refs/heads/master cf9ac454d -> 89ff0b145


[BEAM-1964] Fix lint issues for linter upgrade -3


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd79f4d8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd79f4d8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd79f4d8

Branch: refs/heads/master
Commit: bd79f4d8bacba116a4c7f188cad0cdbf507d36d8
Parents: bf474a0
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Fri Apr 14 11:22:25 2017 -0700
Committer: Ahmet Altay <altay@altay-macbookpro2.roam.corp.google.com>
Committed: Fri Apr 14 13:06:14 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/concat_source.py     | 74 +++++++++-----------
 .../apache_beam/io/filebasedsource_test.py      |  2 +-
 sdks/python/apache_beam/io/fileio.py            |  6 +-
 sdks/python/apache_beam/io/filesystems_util.py  |  3 +-
 sdks/python/apache_beam/io/gcp/bigquery.py      | 13 ++--
 sdks/python/apache_beam/io/iobase.py            |  7 +-
 sdks/python/apache_beam/io/localfilesystem.py   |  3 +-
 sdks/python/apache_beam/io/range_trackers.py    | 19 +++--
 sdks/python/apache_beam/io/source_test_utils.py |  7 +-
 sdks/python/apache_beam/io/textio.py            | 13 ++--
 sdks/python/apache_beam/transforms/combiners.py | 29 ++++----
 11 files changed, 81 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/concat_source.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/concat_source.py b/sdks/python/apache_beam/io/concat_source.py
index 1656180..de51f0f 100644
--- a/sdks/python/apache_beam/io/concat_source.py
+++ b/sdks/python/apache_beam/io/concat_source.py
@@ -84,8 +84,7 @@ class ConcatSource(iobase.BoundedSource):
       # Getting coder from the first sub-sources. This assumes all sub-sources
       # to produce the same coder.
       return self._source_bundles[0].source.default_output_coder()
-    else:
-      return super(ConcatSource, self).default_output_coder()
+    return super(ConcatSource, self).default_output_coder()
 
 
 class ConcatRangeTracker(iobase.RangeTracker):
@@ -165,13 +164,12 @@ class ConcatRangeTracker(iobase.RangeTracker):
         return False
       elif source_ix == self._end[0] and self._end[1] is None:
         return False
-      else:
-        assert source_ix >= self._claimed_source_ix
-        self._claimed_source_ix = source_ix
-        if source_pos is None:
-          return True
-        else:
-          return self.sub_range_tracker(source_ix).try_claim(source_pos)
+
+      assert source_ix >= self._claimed_source_ix
+      self._claimed_source_ix = source_ix
+      if source_pos is None:
+        return True
+      return self.sub_range_tracker(source_ix).try_claim(source_pos)
 
   def try_split(self, pos):
     source_ix, source_pos = pos
@@ -185,24 +183,24 @@ class ConcatRangeTracker(iobase.RangeTracker):
       elif source_ix == self._end[0] and self._end[1] is None:
         # At/after end.
         return None
+
+      if source_ix > self._claimed_source_ix:
+        # Prefer to split on even boundary.
+        split_pos = None
+        ratio = self._cumulative_weights[source_ix]
       else:
-        if source_ix > self._claimed_source_ix:
-          # Prefer to split on even boundary.
-          split_pos = None
-          ratio = self._cumulative_weights[source_ix]
-        else:
-          # Split the current subsource.
-          split = self.sub_range_tracker(source_ix).try_split(
-              source_pos)
-          if not split:
-            return None
-          split_pos, frac = split
-          ratio = self.local_to_global(source_ix, frac)
-
-        self._end = source_ix, split_pos
-        self._cumulative_weights = [min(w / ratio, 1)
-                                    for w in self._cumulative_weights]
-        return (source_ix, split_pos), ratio
+        # Split the current subsource.
+        split = self.sub_range_tracker(source_ix).try_split(
+            source_pos)
+        if not split:
+          return None
+        split_pos, frac = split
+        ratio = self.local_to_global(source_ix, frac)
+
+      self._end = source_ix, split_pos
+      self._cumulative_weights = [min(w / ratio, 1)
+                                  for w in self._cumulative_weights]
+      return (source_ix, split_pos), ratio
 
   def set_current_position(self, pos):
     raise NotImplementedError('Should only be called on sub-trackers')
@@ -212,10 +210,9 @@ class ConcatRangeTracker(iobase.RangeTracker):
     last = self._end[0] if self._end[1] is None else self._end[0] + 1
     if source_ix == last:
       return (source_ix, None)
-    else:
-      return (source_ix,
-              self.sub_range_tracker(source_ix).position_at_fraction(
-                  source_frac))
+    return (source_ix,
+            self.sub_range_tracker(source_ix).position_at_fraction(
+                source_frac))
 
   def fraction_consumed(self):
     with self._lock:
@@ -234,15 +231,14 @@ class ConcatRangeTracker(iobase.RangeTracker):
     if frac == 1:
       last = self._end[0] if self._end[1] is None else self._end[0] + 1
       return (last, None)
-    else:
-      cw = self._cumulative_weights
-      # Find the last source that starts at or before frac.
-      source_ix = bisect.bisect(cw, frac) - 1
-      # Return this source, converting what's left of frac after starting
-      # this source into a value in [0.0, 1.0) representing how far we are
-      # towards the next source.
-      return (source_ix,
-              (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
+    cw = self._cumulative_weights
+    # Find the last source that starts at or before frac.
+    source_ix = bisect.bisect(cw, frac) - 1
+    # Return this source, converting what's left of frac after starting
+    # this source into a value in [0.0, 1.0) representing how far we are
+    # towards the next source.
+    return (source_ix,
+            (frac - cw[source_ix]) / (cw[source_ix + 1] - cw[source_ix]))
 
   def sub_range_tracker(self, source_ix):
     assert self._start[0] <= source_ix <= self._end[0]

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 7b7ec8a..24a31b1 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -115,10 +115,10 @@ def _write_prepared_data(data, directory=None,
 
 
 def write_prepared_pattern(data, suffixes=None):
+  assert data, 'Data (%s) seems to be empty' % data
   if suffixes is None:
     suffixes = [''] * len(data)
   temp_dir = tempfile.mkdtemp()
-  assert len(data) > 0
   for i, d in enumerate(data):
     file_name = _write_prepared_data(d, temp_dir, prefix='mytemp',
                                      suffix=suffixes[i])

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index b128dc5..dc8957e 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -65,7 +65,7 @@ class ChannelFactory(object):
   def rename_batch(src_dest_pairs):
     sources = [s for s, _ in src_dest_pairs]
     destinations = [d for _, d in src_dest_pairs]
-    if len(sources) == 0:
+    if not sources:
       return []
     bfs = get_filesystem(sources[0])
     try:
@@ -165,7 +165,7 @@ class FileSink(iobase.Sink):
 
     if shard_name_template is None:
       shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
-    elif shard_name_template is '':
+    elif shard_name_template == '':
       num_shards = 1
     self.file_path_prefix = file_path_prefix
     self.file_name_suffix = file_name_suffix
@@ -275,7 +275,7 @@ class FileSink(iobase.Sink):
         return exceptions
       except BeamIOError as exp:
         if exp.exception_details is None:
-          raise exp
+          raise
         for (src, dest), exception in exp.exception_details.iteritems():
           if exception:
             logging.warning('Rename not successful: %s -> %s, %s', src, dest,

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/filesystems_util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py
index 6d21298..5034068 100644
--- a/sdks/python/apache_beam/io/filesystems_util.py
+++ b/sdks/python/apache_beam/io/filesystems_util.py
@@ -32,5 +32,4 @@ def get_filesystem(path):
           'Google Cloud Platform IO not available, '
           'please install apache_beam[gcp]')
     return GCSFileSystem()
-  else:
-    return LocalFileSystem()
+  return LocalFileSystem()

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9a8174a..25f544d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -967,13 +967,12 @@ class BigQueryWrapper(object):
           % (project_id, dataset_id, table_id))
     if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
       return found_table
-    else:
-      # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
-      # the table before this point.
-      return self._create_table(project_id=project_id,
-                                dataset_id=dataset_id,
-                                table_id=table_id,
-                                schema=schema or found_table.schema)
+    # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
+    # the table before this point.
+    return self._create_table(project_id=project_id,
+                              dataset_id=dataset_id,
+                              table_id=table_id,
+                              schema=schema or found_table.schema)
 
   def run_query(self, project_id, query, use_legacy_sql, flatten_results,
                 dry_run=False):

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index 512824b..d9df5c4 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -805,8 +805,7 @@ class Read(ptransform.PTransform):
   def _infer_output_coder(self, input_type=None, input_coder=None):
     if isinstance(self.source, BoundedSource):
       return self.source.default_output_coder()
-    else:
-      return self.source.coder
+    return self.source.coder
 
   def display_data(self):
     return {'source': DisplayDataItem(self.source.__class__,
@@ -945,8 +944,8 @@ class _WriteKeyedBundleDoFn(core.DoFn):
   def process(self, element, init_result):
     bundle = element
     writer = self.sink.open_writer(init_result, str(uuid.uuid4()))
-    for element in bundle[1]:  # values
-      writer.write(element)
+    for e in bundle[1]:  # values
+      writer.write(e)
     return [window.TimestampedValue(writer.close(), window.MAX_TIMESTAMP)]
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index 46589b0..7637f2a 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -93,8 +93,7 @@ class LocalFileSystem(FileSystem):
     raw_file = open(path, mode)
     if compression_type == CompressionTypes.UNCOMPRESSED:
       return raw_file
-    else:
-      return CompressedFile(raw_file, compression_type=compression_type)
+    return CompressedFile(raw_file, compression_type=compression_type)
 
   def create(self, path, mime_type='application/octet-stream',
              compression_type=CompressionTypes.AUTO):

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 8627f76..6e7b84f 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -42,9 +42,9 @@ class OffsetRangeTracker(iobase.RangeTracker):
       raise ValueError('Start offset must not be \'None\'')
     if end is None:
       raise ValueError('End offset must not be \'None\'')
-    assert isinstance(start, int) or isinstance(start, long)
+    assert isinstance(start, (int, long))
     if end != self.OFFSET_INFINITY:
-      assert isinstance(end, int) or isinstance(end, long)
+      assert isinstance(end, (int, long))
 
     assert start <= end
 
@@ -91,8 +91,8 @@ class OffsetRangeTracker(iobase.RangeTracker):
           'The first record [starting at %d] must be at a split point' %
           record_start)
 
-    if (split_point and self._offset_of_last_split_point is not -1 and
-        record_start is self._offset_of_last_split_point):
+    if (split_point and self._offset_of_last_split_point != -1 and
+        record_start == self._offset_of_last_split_point):
       raise ValueError(
           'Record at a split point has same offset as the previous split '
           'point: %d' % record_start)
@@ -354,8 +354,7 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
       if self._stop_position is None or position < self._stop_position:
         self._last_claim = position
         return True
-      else:
-        return False
+      return False
 
   def position_at_fraction(self, fraction):
     return self.fraction_to_position(
@@ -373,15 +372,13 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
             position, start=self._start_position, end=self._stop_position)
         self._stop_position = position
         return position, fraction
-      else:
-        return None
+      return None
 
   def fraction_consumed(self):
     if self._last_claim is self.UNSTARTED:
       return 0
-    else:
-      return self.position_to_fraction(
-          self._last_claim, self._start_position, self._stop_position)
+    return self.position_to_fraction(
+        self._last_claim, self._start_position, self._stop_position)
 
   def position_to_fraction(self, pos, start, end):
     """

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index 07de738..542e9f6 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -611,10 +611,9 @@ def _assertSplitAtFractionConcurrent(
   def read_or_split(test_params):
     if test_params[0]:
       return [val for val in test_params[1]]
-    else:
-      position = test_params[1].position_at_fraction(test_params[2])
-      result = test_params[1].try_split(position)
-      return result
+    position = test_params[1].position_at_fraction(test_params[2])
+    result = test_params[1].try_split(position)
+    return result
 
   inputs = []
   pool = thread_pool if thread_pool else _ThreadPool(2)

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 9217e74..b6a24b0 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -198,9 +198,9 @@ class _TextSource(filebasedsource.FileBasedSource):
         if next_lf > 0 and read_buffer.data[next_lf - 1] == '\r':
           # Found a '\r\n'. Accepting that as the next separator.
           return (next_lf - 1, next_lf + 1)
-        else:
-          # Found a '\n'. Accepting that as the next separator.
-          return (next_lf, next_lf + 1)
+
+        # Found a '\n'. Accepting that as the next separator.
+        return (next_lf, next_lf + 1)
 
       current_pos = len(read_buffer.data)
 
@@ -256,10 +256,9 @@ class _TextSource(filebasedsource.FileBasedSource):
       # Current record should not contain the separator.
       return (read_buffer.data[record_start_position_in_buffer:sep_bounds[0]],
               sep_bounds[1] - record_start_position_in_buffer)
-    else:
-      # Current record should contain the separator.
-      return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
-              sep_bounds[1] - record_start_position_in_buffer)
+    # Current record should contain the separator.
+    return (read_buffer.data[record_start_position_in_buffer:sep_bounds[1]],
+            sep_bounds[1] - record_start_position_in_buffer)
 
 
 class _TextSink(fileio.FileSink):

http://git-wip-us.apache.org/repos/asf/beam/blob/bd79f4d8/sdks/python/apache_beam/transforms/combiners.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py
index a4cd462..f812832 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -531,27 +531,26 @@ def curry_combine_fn(fn, args, kwargs):
   if not args and not kwargs:
     return fn
 
-  else:
+  # Create CurriedFn class for the combiner
+  class CurriedFn(core.CombineFn):
+    """CombineFn that applies extra arguments."""
 
-    class CurriedFn(core.CombineFn):
-      """CombineFn that applies extra arguments."""
+    def create_accumulator(self):
+      return fn.create_accumulator(*args, **kwargs)
 
-      def create_accumulator(self):
-        return fn.create_accumulator(*args, **kwargs)
+    def add_input(self, accumulator, element):
+      return fn.add_input(accumulator, element, *args, **kwargs)
 
-      def add_input(self, accumulator, element):
-        return fn.add_input(accumulator, element, *args, **kwargs)
+    def merge_accumulators(self, accumulators):
+      return fn.merge_accumulators(accumulators, *args, **kwargs)
 
-      def merge_accumulators(self, accumulators):
-        return fn.merge_accumulators(accumulators, *args, **kwargs)
+    def extract_output(self, accumulator):
+      return fn.extract_output(accumulator, *args, **kwargs)
 
-      def extract_output(self, accumulator):
-        return fn.extract_output(accumulator, *args, **kwargs)
+    def apply(self, elements):
+      return fn.apply(elements, *args, **kwargs)
 
-      def apply(self, elements):
-        return fn.apply(elements, *args, **kwargs)
-
-    return CurriedFn()
+  return CurriedFn()
 
 
 class PhasedCombineFnExecutor(object):


Mime
View raw message