beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: [BEAM-2497] Fix the reading of concat gzip files
Date Fri, 23 Jun 2017 05:13:29 GMT
Repository: beam
Updated Branches:
  refs/heads/master b3c36256e -> 336b7f1cf


[BEAM-2497] Fix the reading of concat gzip files


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8dcda6e4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8dcda6e4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8dcda6e4

Branch: refs/heads/master
Commit: 8dcda6e40355af13f4d92fcd44aae4539a225a4a
Parents: b3c3625
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Thu Jun 22 17:08:20 2017 -0700
Committer: Chamikara Jayalath <chamikara@google.com>
Committed: Thu Jun 22 22:12:56 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets_test.py  | 16 ++++++++++++++++
 sdks/python/apache_beam/io/filesystem.py            |  8 ++++++++
 2 files changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8dcda6e4/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 9183d0d..31f71b3 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -589,6 +589,22 @@ class SnippetsTest(unittest.TestCase):
     snippets.model_textio_compressed(
         {'read': gzip_file_name}, ['aa', 'bb', 'cc'])
 
+  def test_model_textio_gzip_concatenated(self):
+    temp_path_1 = self.create_temp_file('a\nb\nc\n')
+    temp_path_2 = self.create_temp_file('p\nq\nr\n')
+    temp_path_3 = self.create_temp_file('x\ny\nz')
+    gzip_file_name = temp_path_1 + '.gz'
+    with open(temp_path_1) as src, gzip.open(gzip_file_name, 'wb') as dst:
+      dst.writelines(src)
+    with open(temp_path_2) as src, gzip.open(gzip_file_name, 'ab') as dst:
+      dst.writelines(src)
+    with open(temp_path_3) as src, gzip.open(gzip_file_name, 'ab') as dst:
+      dst.writelines(src)
+      # Add the temporary gzip file to be cleaned up as well.
+      self.temp_files.append(gzip_file_name)
+    snippets.model_textio_compressed(
+        {'read': gzip_file_name}, ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z'])
+
   @unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed')
   def test_model_datastoreio(self):
     # We cannot test datastoreio functionality in unit tests therefore we limit

http://git-wip-us.apache.org/repos/asf/beam/blob/8dcda6e4/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index f553026..1f65d0a 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -201,6 +201,14 @@ class CompressedFile(object):
             assert False, 'Possible file corruption.'
           except EOFError:
             pass  # All is as expected!
+        elif self._compression_type == CompressionTypes.GZIP:
+          # If Gzip file check if there is unused data generated by gzip concat
+          if self._decompressor.unused_data != '':
+            buf = self._decompressor.unused_data
+            self._decompressor = zlib.decompressobj(self._gzip_mask)
+            decompressed = self._decompressor.decompress(buf)
+            self._read_buffer.write(decompressed)
+            continue
         else:
           self._read_buffer.write(self._decompressor.flush())
 


Mime
View raw message