beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [2/3] incubator-beam git commit: A few test fixes and other cleanups.
Date Sat, 15 Oct 2016 20:35:31 GMT
A few test fixes and other cleanups.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4575854
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4575854
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4575854

Branch: refs/heads/python-sdk
Commit: b4575854b2c4aa9e56905dcd8f7385042a438eac
Parents: 88461ab
Author: Gus Katsiapis <katsiapis@katsiapis-linux.mtv.corp.google.com>
Authored: Thu Oct 6 09:55:38 2016 -0700
Committer: Robert Bradshaw <robertwb@gmail.com>
Committed: Sat Oct 15 13:30:06 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/avroio.py               | 4 ++--
 sdks/python/apache_beam/io/filebasedsource_test.py | 2 +-
 sdks/python/apache_beam/io/fileio.py               | 5 +++--
 sdks/python/apache_beam/io/textio_test.py          | 4 ++--
 4 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4575854/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 7de00df..53ed95a 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -287,7 +287,7 @@ class WriteToAvro(beam.transforms.PTransform):
       A WriteToAvro transform usable for writing.
     """
     self._args = (file_path_prefix, schema, codec, file_name_suffix, num_shards,
-                 shard_name_template, mime_type)
+                  shard_name_template, mime_type)
 
   def apply(self, pcoll):
     return pcoll | beam.io.iobase.Write(_AvroSink(*self._args))
@@ -323,4 +323,4 @@ class _AvroSink(fileio.FileSink):
         file_handle, avro.io.DatumWriter(), self._schema, self._codec)
 
   def write_record(self, writer, value):
-    writer.append(value)
\ No newline at end of file
+    writer.append(value)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4575854/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index f1ac482..2c68d2e 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -406,7 +406,7 @@ class TestFileBasedSource(unittest.TestCase):
     _, lines = write_data(10)
     filename = tempfile.NamedTemporaryFile(
         delete=False, prefix=tempfile.template, suffix='.bz2').name
-    with gzip.GzipFile(filename, 'wb') as f:
+    with bz2.BZ2File(filename, 'wb') as f:
       f.write('\n'.join(lines))
 
     pipeline = beam.Pipeline('DirectPipelineRunner')

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4575854/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index e6575b0..f74ac9c 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -633,7 +633,7 @@ class _CompressedFile(object):
           assert not self._decompressor.unused_data, 'Possible file corruption.'
           try:
             # EOF implies that the underlying BZIP2 stream must also have
-            # reached EFO. We expect this to raise an EOFError and we catch it
+            # reached EOF. We expect this to raise an EOFError and we catch it
             # below. Any other kind of error though would be problematic.
             self._decompressor.decompress('dummy')
             assert False, 'Possible file corruption.'
@@ -680,7 +680,8 @@ class _CompressedFile(object):
     if self._file is None:
       return
 
-    self.flush()
+    if self._writeable():
+      self._file.write(self._compressor.flush())
     self._file.close()
 
   def flush(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4575854/sdks/python/apache_beam/io/textio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index d42de2b..109506a 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -265,7 +265,7 @@ class TextSourceTest(unittest.TestCase):
     _, lines = write_data(15)
     file_name = tempfile.NamedTemporaryFile(
         delete=False, prefix=tempfile.template, suffix='.bz2').name
-    with bz2.BZFile(file_name, 'wb') as f:
+    with bz2.BZ2File(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
     pipeline = beam.Pipeline('DirectPipelineRunner')
@@ -289,7 +289,7 @@ class TextSourceTest(unittest.TestCase):
     _, lines = write_data(15)
     file_name = tempfile.NamedTemporaryFile(
         delete=False, prefix=tempfile.template).name
-    with bz2.BZFile(file_name, 'wb') as f:
+    with bz2.BZ2File(file_name, 'wb') as f:
       f.write('\n'.join(lines))
 
     pipeline = beam.Pipeline('DirectPipelineRunner')


Mime
View raw message