beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: CP #3111 Rename filesink to filebasedsink
Date Fri, 12 May 2017 01:20:17 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 da2476d0b -> 905ebccf2


CP #3111 Rename filesink to filebasedsink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/717ab8c1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/717ab8c1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/717ab8c1

Branch: refs/heads/release-2.0.0
Commit: 717ab8c14cb8c166168c812d4bd16e603831af47
Parents: da2476d
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Thu May 11 17:23:40 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Thu May 11 18:19:55 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/__init__.py          |   2 +-
 sdks/python/apache_beam/io/avroio.py            |   4 +-
 sdks/python/apache_beam/io/filebasedsink.py     | 299 ++++++++++++++++++
 .../python/apache_beam/io/filebasedsink_test.py | 303 ++++++++++++++++++
 sdks/python/apache_beam/io/fileio.py            | 304 -------------------
 sdks/python/apache_beam/io/fileio_test.py       | 303 ------------------
 sdks/python/apache_beam/io/gcp/gcsio.py         |   6 +-
 sdks/python/apache_beam/io/iobase.py            |  12 +-
 sdks/python/apache_beam/io/textio.py            |   4 +-
 sdks/python/apache_beam/io/tfrecordio.py        |   6 +-
 .../apache_beam/testing/pipeline_verifiers.py   |   4 +-
 11 files changed, 623 insertions(+), 624 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/__init__.py b/sdks/python/apache_beam/io/__init__.py
index 881ce68..6ea0efd 100644
--- a/sdks/python/apache_beam/io/__init__.py
+++ b/sdks/python/apache_beam/io/__init__.py
@@ -19,7 +19,7 @@
 
 # pylint: disable=wildcard-import
 from apache_beam.io.avroio import *
-from apache_beam.io.fileio import *
+from apache_beam.io.filebasedsink import *
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Sink
 from apache_beam.io.iobase import Write

http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 1c08c68..e02e1f7 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -27,7 +27,7 @@ from avro import schema
 
 import apache_beam as beam
 from apache_beam.io import filebasedsource
-from apache_beam.io import fileio
+from apache_beam.io import filebasedsink
 from apache_beam.io import iobase
 from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.iobase import Read
@@ -335,7 +335,7 @@ class WriteToAvro(beam.transforms.PTransform):
     return {'sink_dd': self._sink}
 
 
-class _AvroSink(fileio.FileSink):
+class _AvroSink(filebasedsink.FileBasedSink):
   """A sink to avro files."""
 
   def __init__(self,

http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/filebasedsink.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py
new file mode 100644
index 0000000..76c09fc
--- /dev/null
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -0,0 +1,299 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""File-based sink."""
+
+from __future__ import absolute_import
+
+import logging
+import os
+import re
+import time
+import uuid
+
+from apache_beam.internal import util
+from apache_beam.io import iobase
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.io.filesystem import CompressionTypes
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.options.value_provider import ValueProvider
+from apache_beam.options.value_provider import StaticValueProvider
+from apache_beam.options.value_provider import check_accessible
+
+DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
+
+__all__ = ['FileBasedSink']
+
+
+class FileBasedSink(iobase.Sink):
+  """A sink to a GCS or local files.
+
+  To implement a file-based sink, extend this class and override
+  either ``write_record()`` or ``write_encoded_record()``.
+
+  If needed, also overwrite ``open()`` and/or ``close()`` to customize the
+  file handling or write headers and footers.
+
+  The output of this write is a PCollection of all written shards.
+  """
+
+  # Max number of threads to be used for renaming.
+  _MAX_RENAME_THREADS = 64
+
+  def __init__(self,
+               file_path_prefix,
+               coder,
+               file_name_suffix='',
+               num_shards=0,
+               shard_name_template=None,
+               mime_type='application/octet-stream',
+               compression_type=CompressionTypes.AUTO):
+    """
+     Raises:
+      TypeError: if file path parameters are not a string or ValueProvider,
+                 or if compression_type is not member of CompressionTypes.
+      ValueError: if shard_name_template is not of expected format.
+    """
+    if not isinstance(file_path_prefix, (basestring, ValueProvider)):
+      raise TypeError('file_path_prefix must be a string or ValueProvider;'
+                      'got %r instead' % file_path_prefix)
+    if not isinstance(file_name_suffix, (basestring, ValueProvider)):
+      raise TypeError('file_name_suffix must be a string or ValueProvider;'
+                      'got %r instead' % file_name_suffix)
+
+    if not CompressionTypes.is_valid_compression_type(compression_type):
+      raise TypeError('compression_type must be CompressionType object but '
+                      'was %s' % type(compression_type))
+    if shard_name_template is None:
+      shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
+    elif shard_name_template == '':
+      num_shards = 1
+    if isinstance(file_path_prefix, basestring):
+      file_path_prefix = StaticValueProvider(str, file_path_prefix)
+    if isinstance(file_name_suffix, basestring):
+      file_name_suffix = StaticValueProvider(str, file_name_suffix)
+    self.file_path_prefix = file_path_prefix
+    self.file_name_suffix = file_name_suffix
+    self.num_shards = num_shards
+    self.coder = coder
+    self.shard_name_format = self._template_to_format(shard_name_template)
+    self.compression_type = compression_type
+    self.mime_type = mime_type
+
+  def display_data(self):
+    return {'shards':
+            DisplayDataItem(self.num_shards,
+                            label='Number of Shards').drop_if_default(0),
+            'compression':
+            DisplayDataItem(str(self.compression_type)),
+            'file_pattern':
+            DisplayDataItem('{}{}{}'.format(self.file_path_prefix,
+                                            self.shard_name_format,
+                                            self.file_name_suffix),
+                            label='File Pattern')}
+
+  @check_accessible(['file_path_prefix'])
+  def open(self, temp_path):
+    """Opens ``temp_path``, returning an opaque file handle object.
+
+    The returned file handle is passed to ``write_[encoded_]record`` and
+    ``close``.
+    """
+    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
+
+  def write_record(self, file_handle, value):
+    """Writes a single record go the file handle returned by ``open()``.
+
+    By default, calls ``write_encoded_record`` after encoding the record with
+    this sink's Coder.
+    """
+    self.write_encoded_record(file_handle, self.coder.encode(value))
+
+  def write_encoded_record(self, file_handle, encoded_value):
+    """Writes a single encoded record to the file handle returned by ``open()``.
+    """
+    raise NotImplementedError
+
+  def close(self, file_handle):
+    """Finalize and close the file handle returned from ``open()``.
+
+    Called after all records are written.
+
+    By default, calls ``file_handle.close()`` iff it is not None.
+    """
+    if file_handle is not None:
+      file_handle.close()
+
+  @check_accessible(['file_path_prefix', 'file_name_suffix'])
+  def initialize_write(self):
+    file_path_prefix = self.file_path_prefix.get()
+
+    tmp_dir = self._create_temp_dir(file_path_prefix)
+    FileSystems.mkdirs(tmp_dir)
+    return tmp_dir
+
+  def _create_temp_dir(self, file_path_prefix):
+    base_path, last_component = FileSystems.split(file_path_prefix)
+    if not last_component:
+      # Trying to re-split the base_path to check if it's a root.
+      new_base_path, _ = FileSystems.split(base_path)
+      if base_path == new_base_path:
+        raise ValueError('Cannot create a temporary directory for root path '
+                         'prefix %s. Please specify a file path prefix with '
+                         'at least two components.',
+                         file_path_prefix)
+    path_components = [base_path,
+                       'beam-temp-' + last_component + '-' + uuid.uuid1().hex]
+    return FileSystems.join(*path_components)
+
+  @check_accessible(['file_path_prefix', 'file_name_suffix'])
+  def open_writer(self, init_result, uid):
+    # A proper suffix is needed for AUTO compression detection.
+    # We also ensure there will be no collisions with uid and a
+    # (possibly unsharded) file_path_prefix and a (possibly empty)
+    # file_name_suffix.
+    file_path_prefix = self.file_path_prefix.get()
+    file_name_suffix = self.file_name_suffix.get()
+    suffix = (
+        '.' + os.path.basename(file_path_prefix) + file_name_suffix)
+    return FileBasedSinkWriter(self, os.path.join(init_result, uid) + suffix)
+
+  @check_accessible(['file_path_prefix', 'file_name_suffix'])
+  def finalize_write(self, init_result, writer_results):
+    file_path_prefix = self.file_path_prefix.get()
+    file_name_suffix = self.file_name_suffix.get()
+    writer_results = sorted(writer_results)
+    num_shards = len(writer_results)
+    min_threads = min(num_shards, FileBasedSink._MAX_RENAME_THREADS)
+    num_threads = max(1, min_threads)
+
+    source_files = []
+    destination_files = []
+    chunk_size = FileSystems.get_chunk_size(file_path_prefix)
+    for shard_num, shard in enumerate(writer_results):
+      final_name = ''.join([
+          file_path_prefix, self.shard_name_format % dict(
+              shard_num=shard_num, num_shards=num_shards), file_name_suffix
+      ])
+      source_files.append(shard)
+      destination_files.append(final_name)
+
+    source_file_batch = [source_files[i:i + chunk_size]
+                         for i in xrange(0, len(source_files),
+                                         chunk_size)]
+    destination_file_batch = [destination_files[i:i + chunk_size]
+                              for i in xrange(0, len(destination_files),
+                                              chunk_size)]
+
+    logging.info(
+        'Starting finalize_write threads with num_shards: %d, '
+        'batches: %d, num_threads: %d',
+        num_shards, len(source_file_batch), num_threads)
+    start_time = time.time()
+
+    # Use a thread pool for renaming operations.
+    def _rename_batch(batch):
+      """_rename_batch executes batch rename operations."""
+      source_files, destination_files = batch
+      exceptions = []
+      try:
+        FileSystems.rename(source_files, destination_files)
+        return exceptions
+      except BeamIOError as exp:
+        if exp.exception_details is None:
+          raise
+        for (src, dest), exception in exp.exception_details.iteritems():
+          if exception:
+            logging.warning('Rename not successful: %s -> %s, %s', src, dest,
+                            exception)
+            should_report = True
+            if isinstance(exception, IOError):
+              # May have already been copied.
+              try:
+                if FileSystems.exists(dest):
+                  should_report = False
+              except Exception as exists_e:  # pylint: disable=broad-except
+                logging.warning('Exception when checking if file %s exists: '
+                                '%s', dest, exists_e)
+            if should_report:
+              logging.warning(('Exception in _rename_batch. src: %s, '
+                               'dest: %s, err: %s'), src, dest, exception)
+              exceptions.append(exception)
+          else:
+            logging.debug('Rename successful: %s -> %s', src, dest)
+        return exceptions
+
+    exception_batches = util.run_using_threadpool(
+        _rename_batch, zip(source_file_batch, destination_file_batch),
+        num_threads)
+
+    all_exceptions = [e for exception_batch in exception_batches
+                      for e in exception_batch]
+    if all_exceptions:
+      raise Exception('Encountered exceptions in finalize_write: %s',
+                      all_exceptions)
+
+    for final_name in destination_files:
+      yield final_name
+
+    logging.info('Renamed %d shards in %.2f seconds.', num_shards,
+                 time.time() - start_time)
+
+    try:
+      FileSystems.delete([init_result])
+    except IOError:
+      # May have already been removed.
+      pass
+
+  @staticmethod
+  def _template_to_format(shard_name_template):
+    if not shard_name_template:
+      return ''
+    m = re.search('S+', shard_name_template)
+    if m is None:
+      raise ValueError("Shard number pattern S+ not found in template '%s'" %
+                       shard_name_template)
+    shard_name_format = shard_name_template.replace(
+        m.group(0), '%%(shard_num)0%dd' % len(m.group(0)))
+    m = re.search('N+', shard_name_format)
+    if m:
+      shard_name_format = shard_name_format.replace(
+          m.group(0), '%%(num_shards)0%dd' % len(m.group(0)))
+    return shard_name_format
+
+  def __eq__(self, other):
+    # TODO: Clean up workitem_test which uses this.
+    # pylint: disable=unidiomatic-typecheck
+    return type(self) == type(other) and self.__dict__ == other.__dict__
+
+
+class FileBasedSinkWriter(iobase.Writer):
+  """The writer for FileBasedSink.
+  """
+
+  def __init__(self, sink, temp_shard_path):
+    self.sink = sink
+    self.temp_shard_path = temp_shard_path
+    self.temp_handle = self.sink.open(temp_shard_path)
+
+  def write(self, value):
+    self.sink.write_record(self.temp_handle, value)
+
+  def close(self):
+    self.sink.close(self.temp_handle)
+    return self.temp_shard_path

http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/filebasedsink_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py
new file mode 100644
index 0000000..1f6aeee
--- /dev/null
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -0,0 +1,303 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for file sinks."""
+
+import glob
+import logging
+import os
+import shutil
+import tempfile
+import unittest
+
+import hamcrest as hc
+import mock
+
+import apache_beam as beam
+from apache_beam.coders import coders
+from apache_beam.io import filebasedsink
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms.display import DisplayData
+from apache_beam.transforms.display_test import DisplayDataItemMatcher
+
+from apache_beam.options.value_provider import StaticValueProvider
+
+
+# TODO: Refactor code so all io tests are using same library
+# TestCaseWithTempDirCleanup class.
+class _TestCaseWithTempDirCleanUp(unittest.TestCase):
+  """Base class for TestCases that deals with TempDir clean-up.
+
+  Inherited test cases will call self._new_tempdir() to start a temporary dir
+  which will be deleted at the end of the tests (when tearDown() is called).
+  """
+
+  def setUp(self):
+    self._tempdirs = []
+
+  def tearDown(self):
+    for path in self._tempdirs:
+      if os.path.exists(path):
+        shutil.rmtree(path)
+    self._tempdirs = []
+
+  def _new_tempdir(self):
+    result = tempfile.mkdtemp()
+    self._tempdirs.append(result)
+    return result
+
+  def _create_temp_file(self, name='', suffix=''):
+    if not name:
+      name = tempfile.template
+    file_name = tempfile.NamedTemporaryFile(
+        delete=False, prefix=name,
+        dir=self._new_tempdir(), suffix=suffix).name
+    return file_name
+
+
+class MyFileBasedSink(filebasedsink.FileBasedSink):
+
+  def open(self, temp_path):
+    # TODO: Fix main session pickling.
+    # file_handle = super(MyFileBasedSink, self).open(temp_path)
+    file_handle = filebasedsink.FileBasedSink.open(self, temp_path)
+    file_handle.write('[start]')
+    return file_handle
+
+  def write_encoded_record(self, file_handle, encoded_value):
+    file_handle.write('[')
+    file_handle.write(encoded_value)
+    file_handle.write(']')
+
+  def close(self, file_handle):
+    file_handle.write('[end]')
+    # TODO: Fix main session pickling.
+    # file_handle = super(MyFileBasedSink, self).close(file_handle)
+    file_handle = filebasedsink.FileBasedSink.close(self, file_handle)
+
+
+class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
+
+  def test_file_sink_writing(self):
+    temp_path = os.path.join(self._new_tempdir(), 'FileBasedSink')
+    sink = MyFileBasedSink(
+        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+
+    # Manually invoke the generic Sink API.
+    init_token = sink.initialize_write()
+
+    writer1 = sink.open_writer(init_token, '1')
+    writer1.write('a')
+    writer1.write('b')
+    res1 = writer1.close()
+
+    writer2 = sink.open_writer(init_token, '2')
+    writer2.write('x')
+    writer2.write('y')
+    writer2.write('z')
+    res2 = writer2.close()
+
+    _ = list(sink.finalize_write(init_token, [res1, res2]))
+    # Retry the finalize operation (as if the first attempt was lost).
+    res = list(sink.finalize_write(init_token, [res1, res2]))
+
+    # Check the results.
+    shard1 = temp_path + '-00000-of-00002.output'
+    shard2 = temp_path + '-00001-of-00002.output'
+    self.assertEqual(res, [shard1, shard2])
+    self.assertEqual(open(shard1).read(), '[start][a][b][end]')
+    self.assertEqual(open(shard2).read(), '[start][x][y][z][end]')
+
+    # Check that any temp files are deleted.
+    self.assertItemsEqual([shard1, shard2], glob.glob(temp_path + '*'))
+
+  def test_file_sink_display_data(self):
+    temp_path = os.path.join(self._new_tempdir(), 'display')
+    sink = MyFileBasedSink(
+        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+    dd = DisplayData.create_from(sink)
+    expected_items = [
+        DisplayDataItemMatcher(
+            'compression', 'auto'),
+        DisplayDataItemMatcher(
+            'file_pattern',
+            '{}{}'.format(
+                temp_path,
+                '-%(shard_num)05d-of-%(num_shards)05d.output'))]
+    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
+  def test_empty_write(self):
+    temp_path = tempfile.NamedTemporaryFile().name
+    sink = MyFileBasedSink(
+        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()
+    )
+    p = TestPipeline()
+    p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
+    p.run()
+    self.assertEqual(
+        open(temp_path + '-00000-of-00001.output').read(), '[start][end]')
+
+  def test_static_value_provider_empty_write(self):
+    temp_path = StaticValueProvider(value_type=str,
+                                    value=tempfile.NamedTemporaryFile().name)
+    sink = MyFileBasedSink(
+        temp_path,
+        file_name_suffix=StaticValueProvider(value_type=str, value='.output'),
+        coder=coders.ToStringCoder()
+    )
+    p = TestPipeline()
+    p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
+    p.run()
+    self.assertEqual(
+        open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]')
+
+  def test_fixed_shard_write(self):
+    temp_path = os.path.join(self._new_tempdir(), 'empty')
+    sink = MyFileBasedSink(
+        temp_path,
+        file_name_suffix='.output',
+        num_shards=3,
+        shard_name_template='_NN_SSS_',
+        coder=coders.ToStringCoder())
+    p = TestPipeline()
+    p | beam.Create(['a', 'b']) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
+
+    p.run()
+
+    concat = ''.join(
+        open(temp_path + '_03_%03d_.output' % shard_num).read()
+        for shard_num in range(3))
+    self.assertTrue('][a][' in concat, concat)
+    self.assertTrue('][b][' in concat, concat)
+
+  # Not using 'test' in name so that 'nose' doesn't pick this as a test.
+  def run_temp_dir_check(self, no_dir_path, dir_path, no_dir_root_path,
+                         dir_root_path, prefix, separator):
+    def _get_temp_dir(file_path_prefix):
+      sink = MyFileBasedSink(
+          file_path_prefix, file_name_suffix='.output',
+          coder=coders.ToStringCoder())
+      return sink.initialize_write()
+
+    temp_dir = _get_temp_dir(no_dir_path)
+    self.assertTrue(temp_dir.startswith(prefix))
+    last_sep = temp_dir.rfind(separator)
+    self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
+
+    temp_dir = _get_temp_dir(dir_path)
+    self.assertTrue(temp_dir.startswith(prefix))
+    last_sep = temp_dir.rfind(separator)
+    self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
+
+    with self.assertRaises(ValueError):
+      _get_temp_dir(no_dir_root_path)
+
+    with self.assertRaises(ValueError):
+      _get_temp_dir(dir_root_path)
+
+  def test_temp_dir_gcs(self):
+    try:
+      self.run_temp_dir_check(
+          'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://',
+          '/')
+    except ValueError:
+      logging.debug('Ignoring test since GCP module is not installed')
+
+  @mock.patch('apache_beam.io.localfilesystem.os')
+  def test_temp_dir_local(self, filesystem_os_mock):
+    # Here we test a unix-like mock file-system
+    # (not really testing Unix or Windows since we mock the function of 'os'
+    # module).
+
+    def _fake_unix_split(path):
+      sep = path.rfind('/')
+      if sep < 0:
+        raise ValueError('Path must contain a separator')
+      return (path[:sep], path[sep + 1:])
+
+    def _fake_unix_join(base, path):
+      return base + '/' + path
+
+    filesystem_os_mock.path.abspath = lambda a: a
+    filesystem_os_mock.path.split.side_effect = _fake_unix_split
+    filesystem_os_mock.path.join.side_effect = _fake_unix_join
+    self.run_temp_dir_check(
+        '/aaa/bbb', '/aaa/bbb/', '/', '/', '/', '/')
+
+  def test_file_sink_multi_shards(self):
+    temp_path = os.path.join(self._new_tempdir(), 'multishard')
+    sink = MyFileBasedSink(
+        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+
+    # Manually invoke the generic Sink API.
+    init_token = sink.initialize_write()
+
+    num_shards = 1000
+    writer_results = []
+    for i in range(num_shards):
+      uuid = 'uuid-%05d' % i
+      writer = sink.open_writer(init_token, uuid)
+      writer.write('a')
+      writer.write('b')
+      writer.write(uuid)
+      writer_results.append(writer.close())
+
+    res_first = list(sink.finalize_write(init_token, writer_results))
+    # Retry the finalize operation (as if the first attempt was lost).
+    res_second = list(sink.finalize_write(init_token, writer_results))
+
+    self.assertItemsEqual(res_first, res_second)
+
+    res = sorted(res_second)
+    for i in range(num_shards):
+      shard_name = '%s-%05d-of-%05d.output' % (temp_path, i, num_shards)
+      uuid = 'uuid-%05d' % i
+      self.assertEqual(res[i], shard_name)
+      self.assertEqual(
+          open(shard_name).read(), ('[start][a][b][%s][end]' % uuid))
+
+    # Check that any temp files are deleted.
+    self.assertItemsEqual(res, glob.glob(temp_path + '*'))
+
+  def test_file_sink_io_error(self):
+    temp_path = os.path.join(self._new_tempdir(), 'ioerror')
+    sink = MyFileBasedSink(
+        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+
+    # Manually invoke the generic Sink API.
+    init_token = sink.initialize_write()
+
+    writer1 = sink.open_writer(init_token, '1')
+    writer1.write('a')
+    writer1.write('b')
+    res1 = writer1.close()
+
+    writer2 = sink.open_writer(init_token, '2')
+    writer2.write('x')
+    writer2.write('y')
+    writer2.write('z')
+    res2 = writer2.close()
+
+    os.remove(res2)
+    with self.assertRaises(Exception):
+      list(sink.finalize_write(init_token, [res1, res2]))
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
deleted file mode 100644
index aa18093..0000000
--- a/sdks/python/apache_beam/io/fileio.py
+++ /dev/null
@@ -1,304 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""File-based sources and sinks."""
-
-from __future__ import absolute_import
-
-import logging
-import os
-import re
-import time
-import uuid
-
-from apache_beam.internal import util
-from apache_beam.io import iobase
-from apache_beam.io.filesystem import BeamIOError
-from apache_beam.io.filesystem import CompressionTypes
-from apache_beam.io.filesystems import FileSystems
-from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.options.value_provider import ValueProvider
-from apache_beam.options.value_provider import StaticValueProvider
-from apache_beam.options.value_provider import check_accessible
-
-DEFAULT_SHARD_NAME_TEMPLATE = '-SSSSS-of-NNNNN'
-
-__all__ = ['FileBasedSink']
-
-
-class FileSink(iobase.Sink):
-  """A sink to a GCS or local files.
-
-  To implement a file-based sink, extend this class and override
-  either ``write_record()`` or ``write_encoded_record()``.
-
-  If needed, also overwrite ``open()`` and/or ``close()`` to customize the
-  file handling or write headers and footers.
-
-  The output of this write is a PCollection of all written shards.
-  """
-
-  # Max number of threads to be used for renaming.
-  _MAX_RENAME_THREADS = 64
-
-  def __init__(self,
-               file_path_prefix,
-               coder,
-               file_name_suffix='',
-               num_shards=0,
-               shard_name_template=None,
-               mime_type='application/octet-stream',
-               compression_type=CompressionTypes.AUTO):
-    """
-     Raises:
-      TypeError: if file path parameters are not a string or ValueProvider,
-                 or if compression_type is not member of CompressionTypes.
-      ValueError: if shard_name_template is not of expected format.
-    """
-    if not isinstance(file_path_prefix, (basestring, ValueProvider)):
-      raise TypeError('file_path_prefix must be a string or ValueProvider;'
-                      'got %r instead' % file_path_prefix)
-    if not isinstance(file_name_suffix, (basestring, ValueProvider)):
-      raise TypeError('file_name_suffix must be a string or ValueProvider;'
-                      'got %r instead' % file_name_suffix)
-
-    if not CompressionTypes.is_valid_compression_type(compression_type):
-      raise TypeError('compression_type must be CompressionType object but '
-                      'was %s' % type(compression_type))
-    if shard_name_template is None:
-      shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
-    elif shard_name_template == '':
-      num_shards = 1
-    if isinstance(file_path_prefix, basestring):
-      file_path_prefix = StaticValueProvider(str, file_path_prefix)
-    if isinstance(file_name_suffix, basestring):
-      file_name_suffix = StaticValueProvider(str, file_name_suffix)
-    self.file_path_prefix = file_path_prefix
-    self.file_name_suffix = file_name_suffix
-    self.num_shards = num_shards
-    self.coder = coder
-    self.shard_name_format = self._template_to_format(shard_name_template)
-    self.compression_type = compression_type
-    self.mime_type = mime_type
-
-  def display_data(self):
-    return {'shards':
-            DisplayDataItem(self.num_shards,
-                            label='Number of Shards').drop_if_default(0),
-            'compression':
-            DisplayDataItem(str(self.compression_type)),
-            'file_pattern':
-            DisplayDataItem('{}{}{}'.format(self.file_path_prefix,
-                                            self.shard_name_format,
-                                            self.file_name_suffix),
-                            label='File Pattern')}
-
-  @check_accessible(['file_path_prefix'])
-  def open(self, temp_path):
-    """Opens ``temp_path``, returning an opaque file handle object.
-
-    The returned file handle is passed to ``write_[encoded_]record`` and
-    ``close``.
-    """
-    return FileSystems.create(temp_path, self.mime_type, self.compression_type)
-
-  def write_record(self, file_handle, value):
-    """Writes a single record go the file handle returned by ``open()``.
-
-    By default, calls ``write_encoded_record`` after encoding the record with
-    this sink's Coder.
-    """
-    self.write_encoded_record(file_handle, self.coder.encode(value))
-
-  def write_encoded_record(self, file_handle, encoded_value):
-    """Writes a single encoded record to the file handle returned by ``open()``.
-    """
-    raise NotImplementedError
-
-  def close(self, file_handle):
-    """Finalize and close the file handle returned from ``open()``.
-
-    Called after all records are written.
-
-    By default, calls ``file_handle.close()`` iff it is not None.
-    """
-    if file_handle is not None:
-      file_handle.close()
-
-  @check_accessible(['file_path_prefix', 'file_name_suffix'])
-  def initialize_write(self):
-    file_path_prefix = self.file_path_prefix.get()
-
-    tmp_dir = self._create_temp_dir(file_path_prefix)
-    FileSystems.mkdirs(tmp_dir)
-    return tmp_dir
-
-  def _create_temp_dir(self, file_path_prefix):
-    base_path, last_component = FileSystems.split(file_path_prefix)
-    if not last_component:
-      # Trying to re-split the base_path to check if it's a root.
-      new_base_path, _ = FileSystems.split(base_path)
-      if base_path == new_base_path:
-        raise ValueError('Cannot create a temporary directory for root path '
-                         'prefix %s. Please specify a file path prefix with '
-                         'at least two components.',
-                         file_path_prefix)
-    path_components = [base_path,
-                       'beam-temp-' + last_component + '-' + uuid.uuid1().hex]
-    return FileSystems.join(*path_components)
-
-  @check_accessible(['file_path_prefix', 'file_name_suffix'])
-  def open_writer(self, init_result, uid):
-    # A proper suffix is needed for AUTO compression detection.
-    # We also ensure there will be no collisions with uid and a
-    # (possibly unsharded) file_path_prefix and a (possibly empty)
-    # file_name_suffix.
-    file_path_prefix = self.file_path_prefix.get()
-    file_name_suffix = self.file_name_suffix.get()
-    suffix = (
-        '.' + os.path.basename(file_path_prefix) + file_name_suffix)
-    return FileSinkWriter(self, os.path.join(init_result, uid) + suffix)
-
-  @check_accessible(['file_path_prefix', 'file_name_suffix'])
-  def finalize_write(self, init_result, writer_results):
-    file_path_prefix = self.file_path_prefix.get()
-    file_name_suffix = self.file_name_suffix.get()
-    writer_results = sorted(writer_results)
-    num_shards = len(writer_results)
-    min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS)
-    num_threads = max(1, min_threads)
-
-    source_files = []
-    destination_files = []
-    chunk_size = FileSystems.get_chunk_size(file_path_prefix)
-    for shard_num, shard in enumerate(writer_results):
-      final_name = ''.join([
-          file_path_prefix, self.shard_name_format % dict(
-              shard_num=shard_num, num_shards=num_shards), file_name_suffix
-      ])
-      source_files.append(shard)
-      destination_files.append(final_name)
-
-    source_file_batch = [source_files[i:i + chunk_size]
-                         for i in xrange(0, len(source_files),
-                                         chunk_size)]
-    destination_file_batch = [destination_files[i:i + chunk_size]
-                              for i in xrange(0, len(destination_files),
-                                              chunk_size)]
-
-    logging.info(
-        'Starting finalize_write threads with num_shards: %d, '
-        'batches: %d, num_threads: %d',
-        num_shards, len(source_file_batch), num_threads)
-    start_time = time.time()
-
-    # Use a thread pool for renaming operations.
-    def _rename_batch(batch):
-      """_rename_batch executes batch rename operations."""
-      source_files, destination_files = batch
-      exceptions = []
-      try:
-        FileSystems.rename(source_files, destination_files)
-        return exceptions
-      except BeamIOError as exp:
-        if exp.exception_details is None:
-          raise
-        for (src, dest), exception in exp.exception_details.iteritems():
-          if exception:
-            logging.warning('Rename not successful: %s -> %s, %s', src, dest,
-                            exception)
-            should_report = True
-            if isinstance(exception, IOError):
-              # May have already been copied.
-              try:
-                if FileSystems.exists(dest):
-                  should_report = False
-              except Exception as exists_e:  # pylint: disable=broad-except
-                logging.warning('Exception when checking if file %s exists: '
-                                '%s', dest, exists_e)
-            if should_report:
-              logging.warning(('Exception in _rename_batch. src: %s, '
-                               'dest: %s, err: %s'), src, dest, exception)
-              exceptions.append(exception)
-          else:
-            logging.debug('Rename successful: %s -> %s', src, dest)
-        return exceptions
-
-    exception_batches = util.run_using_threadpool(
-        _rename_batch, zip(source_file_batch, destination_file_batch),
-        num_threads)
-
-    all_exceptions = [e for exception_batch in exception_batches
-                      for e in exception_batch]
-    if all_exceptions:
-      raise Exception('Encountered exceptions in finalize_write: %s',
-                      all_exceptions)
-
-    for final_name in destination_files:
-      yield final_name
-
-    logging.info('Renamed %d shards in %.2f seconds.', num_shards,
-                 time.time() - start_time)
-
-    try:
-      FileSystems.delete([init_result])
-    except IOError:
-      # May have already been removed.
-      pass
-
-  @staticmethod
-  def _template_to_format(shard_name_template):
-    if not shard_name_template:
-      return ''
-    m = re.search('S+', shard_name_template)
-    if m is None:
-      raise ValueError("Shard number pattern S+ not found in template '%s'" %
-                       shard_name_template)
-    shard_name_format = shard_name_template.replace(
-        m.group(0), '%%(shard_num)0%dd' % len(m.group(0)))
-    m = re.search('N+', shard_name_format)
-    if m:
-      shard_name_format = shard_name_format.replace(
-          m.group(0), '%%(num_shards)0%dd' % len(m.group(0)))
-    return shard_name_format
-
-  def __eq__(self, other):
-    # TODO: Clean up workitem_test which uses this.
-    # pylint: disable=unidiomatic-typecheck
-    return type(self) == type(other) and self.__dict__ == other.__dict__
-
-
-# Using FileBasedSink for the public API to be symmetric with FileBasedSource.
-# TODO: move code from FileSink to here and delete that class.
-FileBasedSink = FileSink
-
-
-class FileSinkWriter(iobase.Writer):
-  """The writer for FileSink.
-  """
-
-  def __init__(self, sink, temp_shard_path):
-    self.sink = sink
-    self.temp_shard_path = temp_shard_path
-    self.temp_handle = self.sink.open(temp_shard_path)
-
-  def write(self, value):
-    self.sink.write_record(self.temp_handle, value)
-
-  def close(self):
-    self.sink.close(self.temp_handle)
-    return self.temp_shard_path

http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
deleted file mode 100644
index b92b8be..0000000
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ /dev/null
@@ -1,303 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for file sinks."""
-
-import glob
-import logging
-import os
-import shutil
-import tempfile
-import unittest
-
-import hamcrest as hc
-import mock
-
-import apache_beam as beam
-from apache_beam.coders import coders
-from apache_beam.io import fileio
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.transforms.display import DisplayData
-from apache_beam.transforms.display_test import DisplayDataItemMatcher
-
-from apache_beam.options.value_provider import StaticValueProvider
-
-
-# TODO: Refactor code so all io tests are using same library
-# TestCaseWithTempDirCleanup class.
-class _TestCaseWithTempDirCleanUp(unittest.TestCase):
-  """Base class for TestCases that deals with TempDir clean-up.
-
-  Inherited test cases will call self._new_tempdir() to start a temporary dir
-  which will be deleted at the end of the tests (when tearDown() is called).
-  """
-
-  def setUp(self):
-    self._tempdirs = []
-
-  def tearDown(self):
-    for path in self._tempdirs:
-      if os.path.exists(path):
-        shutil.rmtree(path)
-    self._tempdirs = []
-
-  def _new_tempdir(self):
-    result = tempfile.mkdtemp()
-    self._tempdirs.append(result)
-    return result
-
-  def _create_temp_file(self, name='', suffix=''):
-    if not name:
-      name = tempfile.template
-    file_name = tempfile.NamedTemporaryFile(
-        delete=False, prefix=name,
-        dir=self._new_tempdir(), suffix=suffix).name
-    return file_name
-
-
-class MyFileSink(fileio.FileSink):
-
-  def open(self, temp_path):
-    # TODO: Fix main session pickling.
-    # file_handle = super(MyFileSink, self).open(temp_path)
-    file_handle = fileio.FileSink.open(self, temp_path)
-    file_handle.write('[start]')
-    return file_handle
-
-  def write_encoded_record(self, file_handle, encoded_value):
-    file_handle.write('[')
-    file_handle.write(encoded_value)
-    file_handle.write(']')
-
-  def close(self, file_handle):
-    file_handle.write('[end]')
-    # TODO: Fix main session pickling.
-    # file_handle = super(MyFileSink, self).close(file_handle)
-    file_handle = fileio.FileSink.close(self, file_handle)
-
-
-class TestFileSink(_TestCaseWithTempDirCleanUp):
-
-  def test_file_sink_writing(self):
-    temp_path = os.path.join(self._new_tempdir(), 'filesink')
-    sink = MyFileSink(
-        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
-
-    # Manually invoke the generic Sink API.
-    init_token = sink.initialize_write()
-
-    writer1 = sink.open_writer(init_token, '1')
-    writer1.write('a')
-    writer1.write('b')
-    res1 = writer1.close()
-
-    writer2 = sink.open_writer(init_token, '2')
-    writer2.write('x')
-    writer2.write('y')
-    writer2.write('z')
-    res2 = writer2.close()
-
-    _ = list(sink.finalize_write(init_token, [res1, res2]))
-    # Retry the finalize operation (as if the first attempt was lost).
-    res = list(sink.finalize_write(init_token, [res1, res2]))
-
-    # Check the results.
-    shard1 = temp_path + '-00000-of-00002.output'
-    shard2 = temp_path + '-00001-of-00002.output'
-    self.assertEqual(res, [shard1, shard2])
-    self.assertEqual(open(shard1).read(), '[start][a][b][end]')
-    self.assertEqual(open(shard2).read(), '[start][x][y][z][end]')
-
-    # Check that any temp files are deleted.
-    self.assertItemsEqual([shard1, shard2], glob.glob(temp_path + '*'))
-
-  def test_file_sink_display_data(self):
-    temp_path = os.path.join(self._new_tempdir(), 'display')
-    sink = MyFileSink(
-        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
-    dd = DisplayData.create_from(sink)
-    expected_items = [
-        DisplayDataItemMatcher(
-            'compression', 'auto'),
-        DisplayDataItemMatcher(
-            'file_pattern',
-            '{}{}'.format(
-                temp_path,
-                '-%(shard_num)05d-of-%(num_shards)05d.output'))]
-    hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
-
-  def test_empty_write(self):
-    temp_path = tempfile.NamedTemporaryFile().name
-    sink = MyFileSink(
-        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()
-    )
-    p = TestPipeline()
-    p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
-    p.run()
-    self.assertEqual(
-        open(temp_path + '-00000-of-00001.output').read(), '[start][end]')
-
-  def test_static_value_provider_empty_write(self):
-    temp_path = StaticValueProvider(value_type=str,
-                                    value=tempfile.NamedTemporaryFile().name)
-    sink = MyFileSink(
-        temp_path,
-        file_name_suffix=StaticValueProvider(value_type=str, value='.output'),
-        coder=coders.ToStringCoder()
-    )
-    p = TestPipeline()
-    p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
-    p.run()
-    self.assertEqual(
-        open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]')
-
-  def test_fixed_shard_write(self):
-    temp_path = os.path.join(self._new_tempdir(), 'empty')
-    sink = MyFileSink(
-        temp_path,
-        file_name_suffix='.output',
-        num_shards=3,
-        shard_name_template='_NN_SSS_',
-        coder=coders.ToStringCoder())
-    p = TestPipeline()
-    p | beam.Create(['a', 'b']) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
-
-    p.run()
-
-    concat = ''.join(
-        open(temp_path + '_03_%03d_.output' % shard_num).read()
-        for shard_num in range(3))
-    self.assertTrue('][a][' in concat, concat)
-    self.assertTrue('][b][' in concat, concat)
-
-  # Not using 'test' in name so that 'nose' doesn't pick this as a test.
-  def run_temp_dir_check(self, no_dir_path, dir_path, no_dir_root_path,
-                         dir_root_path, prefix, separator):
-    def _get_temp_dir(file_path_prefix):
-      sink = MyFileSink(
-          file_path_prefix, file_name_suffix='.output',
-          coder=coders.ToStringCoder())
-      return sink.initialize_write()
-
-    temp_dir = _get_temp_dir(no_dir_path)
-    self.assertTrue(temp_dir.startswith(prefix))
-    last_sep = temp_dir.rfind(separator)
-    self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
-
-    temp_dir = _get_temp_dir(dir_path)
-    self.assertTrue(temp_dir.startswith(prefix))
-    last_sep = temp_dir.rfind(separator)
-    self.assertTrue(temp_dir[last_sep + 1:].startswith('beam-temp'))
-
-    with self.assertRaises(ValueError):
-      _get_temp_dir(no_dir_root_path)
-
-    with self.assertRaises(ValueError):
-      _get_temp_dir(dir_root_path)
-
-  def test_temp_dir_gcs(self):
-    try:
-      self.run_temp_dir_check(
-          'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://',
-          '/')
-    except ValueError:
-      logging.debug('Ignoring test since GCP module is not installed')
-
-  @mock.patch('apache_beam.io.localfilesystem.os')
-  def test_temp_dir_local(self, filesystem_os_mock):
-    # Here we test a unix-like mock file-system
-    # (not really testing Unix or Windows since we mock the function of 'os'
-    # module).
-
-    def _fake_unix_split(path):
-      sep = path.rfind('/')
-      if sep < 0:
-        raise ValueError('Path must contain a separator')
-      return (path[:sep], path[sep + 1:])
-
-    def _fake_unix_join(base, path):
-      return base + '/' + path
-
-    filesystem_os_mock.path.abspath = lambda a: a
-    filesystem_os_mock.path.split.side_effect = _fake_unix_split
-    filesystem_os_mock.path.join.side_effect = _fake_unix_join
-    self.run_temp_dir_check(
-        '/aaa/bbb', '/aaa/bbb/', '/', '/', '/', '/')
-
-  def test_file_sink_multi_shards(self):
-    temp_path = os.path.join(self._new_tempdir(), 'multishard')
-    sink = MyFileSink(
-        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
-
-    # Manually invoke the generic Sink API.
-    init_token = sink.initialize_write()
-
-    num_shards = 1000
-    writer_results = []
-    for i in range(num_shards):
-      uuid = 'uuid-%05d' % i
-      writer = sink.open_writer(init_token, uuid)
-      writer.write('a')
-      writer.write('b')
-      writer.write(uuid)
-      writer_results.append(writer.close())
-
-    res_first = list(sink.finalize_write(init_token, writer_results))
-    # Retry the finalize operation (as if the first attempt was lost).
-    res_second = list(sink.finalize_write(init_token, writer_results))
-
-    self.assertItemsEqual(res_first, res_second)
-
-    res = sorted(res_second)
-    for i in range(num_shards):
-      shard_name = '%s-%05d-of-%05d.output' % (temp_path, i, num_shards)
-      uuid = 'uuid-%05d' % i
-      self.assertEqual(res[i], shard_name)
-      self.assertEqual(
-          open(shard_name).read(), ('[start][a][b][%s][end]' % uuid))
-
-    # Check that any temp files are deleted.
-    self.assertItemsEqual(res, glob.glob(temp_path + '*'))
-
-  def test_file_sink_io_error(self):
-    temp_path = os.path.join(self._new_tempdir(), 'ioerror')
-    sink = MyFileSink(
-        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
-
-    # Manually invoke the generic Sink API.
-    init_token = sink.initialize_write()
-
-    writer1 = sink.open_writer(init_token, '1')
-    writer1.write('a')
-    writer1.write('b')
-    res1 = writer1.close()
-
-    writer2 = sink.open_writer(init_token, '2')
-    writer2.write('x')
-    writer2.write('y')
-    writer2.write('z')
-    res2 = writer2.close()
-
-    os.remove(res2)
-    with self.assertRaises(Exception):
-      list(sink.finalize_write(init_token, [res1, res2]))
-
-
-if __name__ == '__main__':
-  logging.getLogger().setLevel(logging.INFO)
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 774ee54..7e21586 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -257,9 +257,9 @@ class GcsIO(object):
       self.client.objects.Copy(request)
     except HttpError as http_error:
       if http_error.status_code == 404:
-        # This is a permanent error that should not be retried.  Note that
-        # FileSink.finalize_write expects an IOError when the source file does
-        # not exist.
+        # This is a permanent error that should not be retried. Note that
+        # FileBasedSink.finalize_write expects an IOError when the source
+        # file does not exist.
         raise GcsIOError(errno.ENOENT, 'Source file not found: %s' % src)
       raise
 

http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/iobase.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index a80b12f..7e40d83 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -562,7 +562,9 @@ class RangeTracker(object):
 
 
 class Sink(HasDisplayData):
-  """A resource that can be written to using the ``beam.io.Write`` transform.
+  """This class is deprecated, no backwards-compatibility guarantees.
+
+  A resource that can be written to using the ``beam.io.Write`` transform.
 
   Here ``beam`` stands for Apache Beam Python code imported in following manner.
   ``import apache_beam as beam``.
@@ -594,8 +596,8 @@ class Sink(HasDisplayData):
   single record from the bundle and ``close()`` which is called once
   at the end of writing a bundle.
 
-  See also ``apache_beam.io.fileio.FileSink`` which provides a simpler API
-  for writing sinks that produce files.
+  See also ``apache_beam.io.filebasedsink.FileBasedSink`` which provides a
+  simpler API for writing sinks that produce files.
 
   **Execution of the Write transform**
 
@@ -759,7 +761,9 @@ class Sink(HasDisplayData):
 
 
 class Writer(object):
-  """Writes a bundle of elements from a ``PCollection`` to a sink.
+  """This class is deprecated, no backwards-compatibility guarantees.
+
+  Writes a bundle of elements from a ``PCollection`` to a sink.
 
   A Writer  ``iobase.Writer.write()`` writes and elements to the sink while
   ``iobase.Writer.close()`` is called after all elements in the bundle have been

http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index d43f4fc..eeefaf6 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -23,7 +23,7 @@ import logging
 
 from apache_beam.coders import coders
 from apache_beam.io import filebasedsource
-from apache_beam.io import fileio
+from apache_beam.io import filebasedsink
 from apache_beam.io import iobase
 from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.iobase import Read
@@ -262,7 +262,7 @@ class _TextSource(filebasedsource.FileBasedSource):
               sep_bounds[1] - record_start_position_in_buffer)
 
 
-class _TextSink(fileio.FileSink):
+class _TextSink(filebasedsink.FileBasedSink):
   """A sink to a GCS or local text file or files."""
 
   def __init__(self,

http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/io/tfrecordio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/tfrecordio.py b/sdks/python/apache_beam/io/tfrecordio.py
index 2f7f4dc..a8cd1ce 100644
--- a/sdks/python/apache_beam/io/tfrecordio.py
+++ b/sdks/python/apache_beam/io/tfrecordio.py
@@ -23,7 +23,7 @@ import struct
 
 from apache_beam import coders
 from apache_beam.io import filebasedsource
-from apache_beam.io import fileio
+from apache_beam.io import filebasedsink
 from apache_beam.io.filesystem import CompressionTypes
 from apache_beam.io.iobase import Read
 from apache_beam.io.iobase import Write
@@ -210,7 +210,7 @@ class ReadFromTFRecord(PTransform):
     return pvalue.pipeline | Read(self._source)
 
 
-class _TFRecordSink(fileio.FileSink):
+class _TFRecordSink(filebasedsink.FileBasedSink):
   """Sink for writing TFRecords files.
 
   For detailed TFRecord format description see:
@@ -242,7 +242,7 @@ class WriteToTFRecord(PTransform):
                coder=coders.BytesCoder(),
                file_name_suffix='',
                num_shards=0,
-               shard_name_template=fileio.DEFAULT_SHARD_NAME_TEMPLATE,
+               shard_name_template=filebasedsink.DEFAULT_SHARD_NAME_TEMPLATE,
                compression_type=CompressionTypes.AUTO,
                **kwargs):
     """Initialize WriteToTFRecord transform.

http://git-wip-us.apache.org/repos/asf/beam/blob/717ab8c1/sdks/python/apache_beam/testing/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/pipeline_verifiers.py b/sdks/python/apache_beam/testing/pipeline_verifiers.py
index a08eb54..883343a 100644
--- a/sdks/python/apache_beam/testing/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/testing/pipeline_verifiers.py
@@ -81,8 +81,8 @@ def retry_on_io_error_and_server_error(exception):
 class FileChecksumMatcher(BaseMatcher):
   """Matcher that verifies file(s) content by comparing file checksum.
 
-  Use apache_beam.io.fileio to fetch file(s) from given path. File checksum
-  is a hash string computed from content of file(s).
+  Use apache_beam.io.filebasedsink to fetch file(s) from given path.
+  File checksum is a hash string computed from content of file(s).
   """
 
   def __init__(self, file_path, expected_checksum, sleep_secs=None):


Mime
View raw message