beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: Adds support for reading concatenated bzip2 files.
Date Fri, 04 Aug 2017 06:01:04 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.1.0 58928b942 -> bad996bb4


Adds support for reading concatenated bzip2 files.

Adds tests for concatenated gzip and bzip2 files.

Removes test 'test_model_textio_gzip_concatenated' in 'snippets_test.py' since it's actually
hitting 'DummyReadTransform' and not testing this feature.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d6516c69
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d6516c69
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d6516c69

Branch: refs/heads/release-2.1.0
Commit: d6516c69e61f2061005d01a9e36ee1e4137a1478
Parents: 58928b9
Author: chamikara@google.com <chamikara@google.com>
Authored: Wed Aug 2 22:49:33 2017 -0700
Committer: chamikara@google.com <chamikara@google.com>
Committed: Thu Aug 3 11:11:00 2017 -0700

----------------------------------------------------------------------
 .../examples/snippets/snippets_test.py          |  16 ---
 sdks/python/apache_beam/io/filesystem.py        |  31 +++--
 sdks/python/apache_beam/io/textio_test.py       | 115 +++++++++++++++++++
 3 files changed, 129 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d6516c69/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 31f71b3..9183d0d 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -589,22 +589,6 @@ class SnippetsTest(unittest.TestCase):
     snippets.model_textio_compressed(
         {'read': gzip_file_name}, ['aa', 'bb', 'cc'])
 
-  def test_model_textio_gzip_concatenated(self):
-    temp_path_1 = self.create_temp_file('a\nb\nc\n')
-    temp_path_2 = self.create_temp_file('p\nq\nr\n')
-    temp_path_3 = self.create_temp_file('x\ny\nz')
-    gzip_file_name = temp_path_1 + '.gz'
-    with open(temp_path_1) as src, gzip.open(gzip_file_name, 'wb') as dst:
-      dst.writelines(src)
-    with open(temp_path_2) as src, gzip.open(gzip_file_name, 'ab') as dst:
-      dst.writelines(src)
-    with open(temp_path_3) as src, gzip.open(gzip_file_name, 'ab') as dst:
-      dst.writelines(src)
-      # Add the temporary gzip file to be cleaned up as well.
-      self.temp_files.append(gzip_file_name)
-    snippets.model_textio_compressed(
-        {'read': gzip_file_name}, ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z'])
-
   @unittest.skipIf(datastore_pb2 is None, 'GCP dependencies are not installed')
   def test_model_datastoreio(self):
     # We cannot test datastoreio functionality in unit tests therefore we limit

http://git-wip-us.apache.org/repos/asf/beam/blob/d6516c69/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index 1f65d0a..ef3040c 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -187,29 +187,26 @@ class CompressedFile(object):
         del buf  # Free up some possibly large and no-longer-needed memory.
         self._read_buffer.write(decompressed)
       else:
-        # EOF reached.
-        # Verify completeness and no corruption and flush (if needed by
-        # the underlying algorithm).
-        if self._compression_type == CompressionTypes.BZIP2:
-          # Having unused_data past end of stream would imply file corruption.
-          assert not self._decompressor.unused_data, 'Possible file corruption.'
-          try:
-            # EOF implies that the underlying BZIP2 stream must also have
-            # reached EOF. We expect this to raise an EOFError and we catch it
-            # below. Any other kind of error though would be problematic.
-            self._decompressor.decompress('dummy')
-            assert False, 'Possible file corruption.'
-          except EOFError:
-            pass  # All is as expected!
-        elif self._compression_type == CompressionTypes.GZIP:
-          # If Gzip file check if there is unused data generated by gzip concat
+        # EOF of current stream reached.
+        #
+        # Any uncompressed data at the end of the stream of a gzip or bzip2
+        # file that is not corrupted points to a concatenated compressed
+        # file. We read concatenated files by recursively creating decompressor
+        # objects for the unused compressed data.
+        if (self._compression_type == CompressionTypes.BZIP2 or
+            self._compression_type == CompressionTypes.GZIP):
           if self._decompressor.unused_data != '':
             buf = self._decompressor.unused_data
-            self._decompressor = zlib.decompressobj(self._gzip_mask)
+            self._decompressor = (
+                bz2.BZ2Decompressor()
+                if self._compression_type == CompressionTypes.BZIP2
+                else zlib.decompressobj(self._gzip_mask))
             decompressed = self._decompressor.decompress(buf)
             self._read_buffer.write(decompressed)
             continue
         else:
+          # Gzip and bzip2 formats do not require flushing remaining data in the
+          # decompressor into the read buffer when fully decompressing files.
           self._read_buffer.write(self._decompressor.flush())
 
         # Record that we have hit the end of file, so we won't unnecessarily

http://git-wip-us.apache.org/repos/asf/beam/blob/d6516c69/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index 9a4ec47..8bd7116 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -401,6 +401,64 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
+  def test_read_corrupted_bzip2_fails(self):
+    _, lines = write_data(15)
+    file_name = self._create_temp_file()
+    with bz2.BZ2File(file_name, 'wb') as f:
+      f.write('\n'.join(lines))
+
+    with open(file_name, 'wb') as f:
+      f.write('corrupt')
+
+    pipeline = TestPipeline()
+    pcoll = pipeline | 'Read' >> ReadFromText(
+        file_name,
+        compression_type=CompressionTypes.BZIP2)
+    assert_that(pcoll, equal_to(lines))
+    with self.assertRaises(Exception):
+      pipeline.run()
+
+  def test_read_bzip2_concat(self):
+    bzip2_file_name1 = self._create_temp_file()
+    lines = ['a', 'b', 'c']
+    with bz2.BZ2File(bzip2_file_name1, 'wb') as dst:
+      data = '\n'.join(lines) + '\n'
+      dst.write(data)
+
+    bzip2_file_name2 = self._create_temp_file()
+    lines = ['p', 'q', 'r']
+    with bz2.BZ2File(bzip2_file_name2, 'wb') as dst:
+      data = '\n'.join(lines) + '\n'
+      dst.write(data)
+
+    bzip2_file_name3 = self._create_temp_file()
+    lines = ['x', 'y', 'z']
+    with bz2.BZ2File(bzip2_file_name3, 'wb') as dst:
+      data = '\n'.join(lines) + '\n'
+      dst.write(data)
+
+    final_bzip2_file = self._create_temp_file()
+    with open(bzip2_file_name1, 'rb') as src, open(
+        final_bzip2_file, 'wb') as dst:
+      dst.writelines(src.readlines())
+
+    with open(bzip2_file_name2, 'rb') as src, open(
+        final_bzip2_file, 'ab') as dst:
+      dst.writelines(src.readlines())
+
+    with open(bzip2_file_name3, 'rb') as src, open(
+        final_bzip2_file, 'ab') as dst:
+      dst.writelines(src.readlines())
+
+    pipeline = TestPipeline()
+    lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText(
+        final_bzip2_file,
+        compression_type=beam.io.filesystem.CompressionTypes.BZIP2)
+
+    expected = ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z']
+    assert_that(lines, equal_to(expected))
+    pipeline.run()
+
   def test_read_gzip(self):
     _, lines = write_data(15)
     file_name = self._create_temp_file()
@@ -415,6 +473,63 @@ class TextSourceTest(_TestCaseWithTempDirCleanUp):
     assert_that(pcoll, equal_to(lines))
     pipeline.run()
 
+  def test_read_corrupted_gzip_fails(self):
+    _, lines = write_data(15)
+    file_name = self._create_temp_file()
+    with gzip.GzipFile(file_name, 'wb') as f:
+      f.write('\n'.join(lines))
+
+    with open(file_name, 'wb') as f:
+      f.write('corrupt')
+
+    pipeline = TestPipeline()
+    pcoll = pipeline | 'Read' >> ReadFromText(
+        file_name,
+        0, CompressionTypes.GZIP,
+        True, coders.StrUtf8Coder())
+    assert_that(pcoll, equal_to(lines))
+
+    with self.assertRaises(Exception):
+      pipeline.run()
+
+  def test_read_gzip_concat(self):
+    gzip_file_name1 = self._create_temp_file()
+    lines = ['a', 'b', 'c']
+    with gzip.open(gzip_file_name1, 'wb') as dst:
+      data = '\n'.join(lines) + '\n'
+      dst.write(data)
+
+    gzip_file_name2 = self._create_temp_file()
+    lines = ['p', 'q', 'r']
+    with gzip.open(gzip_file_name2, 'wb') as dst:
+      data = '\n'.join(lines) + '\n'
+      dst.write(data)
+
+    gzip_file_name3 = self._create_temp_file()
+    lines = ['x', 'y', 'z']
+    with gzip.open(gzip_file_name3, 'wb') as dst:
+      data = '\n'.join(lines) + '\n'
+      dst.write(data)
+
+    final_gzip_file = self._create_temp_file()
+    with open(gzip_file_name1, 'rb') as src, open(final_gzip_file, 'wb') as dst:
+      dst.writelines(src.readlines())
+
+    with open(gzip_file_name2, 'rb') as src, open(final_gzip_file, 'ab') as dst:
+      dst.writelines(src.readlines())
+
+    with open(gzip_file_name3, 'rb') as src, open(final_gzip_file, 'ab') as dst:
+      dst.writelines(src.readlines())
+
+    pipeline = TestPipeline()
+    lines = pipeline | 'ReadFromText' >> beam.io.ReadFromText(
+        final_gzip_file,
+        compression_type=beam.io.filesystem.CompressionTypes.GZIP)
+
+    expected = ['a', 'b', 'c', 'p', 'q', 'r', 'x', 'y', 'z']
+    assert_that(lines, equal_to(expected))
+    pipeline.run()
+
   def test_read_gzip_large(self):
     _, lines = write_data(10000)
     file_name = self._create_temp_file()


Mime
View raw message