beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: [BEAM-1585] Filesystems should be picked using the provided scheme
Date Fri, 05 May 2017 21:23:11 GMT
Repository: beam
Updated Branches:
  refs/heads/master 50262be1f -> 1ecb111d3


[BEAM-1585] Filesystems should be picked using the provided scheme


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cbe182ee
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cbe182ee
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cbe182ee

Branch: refs/heads/master
Commit: cbe182eee203c92073a25bbcd39f86f8440a5fc2
Parents: 50262be
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Fri May 5 11:00:58 2017 -0700
Committer: chamikara@google.com <chamikara@google.com>
Committed: Fri May 5 14:22:49 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio_test.py       |  3 +-
 sdks/python/apache_beam/io/filesystem.py        | 16 +++++++++
 sdks/python/apache_beam/io/filesystems.py       | 36 ++++++++++++++++++--
 sdks/python/apache_beam/io/filesystems_test.py  | 15 +++++++-
 sdks/python/apache_beam/io/filesystems_util.py  | 36 --------------------
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py |  6 ++++
 .../apache_beam/io/gcp/gcsfilesystem_test.py    |  5 +++
 sdks/python/apache_beam/io/localfilesystem.py   |  5 +++
 .../apache_beam/io/localfilesystem_test.py      |  4 +++
 9 files changed, 84 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 13778d5..3f7211e 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -31,7 +31,6 @@ import mock
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam.io import fileio
-from apache_beam.io.filesystem import BeamIOError
 from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
@@ -216,7 +215,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
       self.run_temp_dir_check(
           'gs://aaa/bbb', 'gs://aaa/bbb/', 'gs://aaa', 'gs://aaa/', 'gs://',
           '/')
-    except BeamIOError:
+    except ValueError:
       logging.debug('Ignoring test since GCP module is not installed')
 
   @mock.patch('apache_beam.io.localfilesystem.os')

http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index ff8af03..3d35f3e 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -425,6 +425,22 @@ class FileSystem(object):
                       'was %s' % type(compression_type))
     return compression_type
 
+  @classmethod
+  def get_all_subclasses(cls):
+    """Get all the subclasses of the FileSystem class
+    """
+    all_subclasses = []
+    for subclass in cls.__subclasses__():
+      all_subclasses.append(subclass)
+      all_subclasses.extend(subclass.get_all_subclasses())
+    return all_subclasses
+
+  @classmethod
+  def scheme(cls):
+    """URI scheme for the FileSystem
+    """
+    raise NotImplementedError
+
   @abc.abstractmethod
   def join(self, basepath, *paths):
     """Join two or more pathname components for the filesystem

http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/filesystems.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py
index 225a857..29f0644 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -17,24 +17,54 @@
 
 """FileSystems interface class for accessing the correct filesystem"""
 
+import re
+
 from apache_beam.io.filesystem import BeamIOError
 from apache_beam.io.filesystem import CompressionTypes
-from apache_beam.io.filesystems_util import get_filesystem
+from apache_beam.io.filesystem import FileSystem
+
+# All filesystem implements should be added here
+# pylint: disable=wrong-import-position, unused-import
+from apache_beam.io.localfilesystem import LocalFileSystem
+
+try:
+  from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+except ImportError:
+  pass
+# pylint: enable=wrong-import-position, unused-import
 
 
 class FileSystems(object):
   """A class that defines the functions that can be performed on a filesystem.
   All methods are static and access the underlying registered filesystems.
   """
+  URI_SCHEMA_PATTERN = re.compile('(?P<scheme>[a-zA-Z][-a-zA-Z0-9+.]*)://.*')
+
+  @staticmethod
+  def get_scheme(path):
+    match_result = FileSystems.URI_SCHEMA_PATTERN.match(path.strip())
+    if match_result is None:
+      return None
+    return match_result.groupdict()['scheme']
 
   @staticmethod
   def get_filesystem(path):
     """Get the correct filesystem for the specified path
     """
     try:
-      return get_filesystem(path)
+      path_scheme = FileSystems.get_scheme(path)
+      systems = [fs for fs in FileSystem.get_all_subclasses()
+                 if fs.scheme() == path_scheme]
+      if len(systems) == 0:
+        raise ValueError('Unable to get the Filesystem for path %s' % path)
+      elif len(systems) == 1:
+        return systems[0]()
+      else:
+        raise ValueError('Found more than one filesystem for path %s' % path)
+    except ValueError:
+      raise
     except Exception as e:
-      raise BeamIOError('Enable to get the Filesystem', {path: e})
+      raise BeamIOError('Unable to get the Filesystem', {path: e})
 
   @staticmethod
   def join(basepath, *paths):

http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/filesystems_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_test.py b/sdks/python/apache_beam/io/filesystems_test.py
index 9165586..9a6f013 100644
--- a/sdks/python/apache_beam/io/filesystems_test.py
+++ b/sdks/python/apache_beam/io/filesystems_test.py
@@ -48,6 +48,19 @@ class FileSystemsTest(unittest.TestCase):
   def tearDown(self):
     shutil.rmtree(self.tmpdir)
 
+  def test_get_scheme(self):
+    self.assertIsNone(FileSystems.get_scheme('/abc/cdf'))
+    self.assertIsNone(FileSystems.get_scheme('c:\\abc\cdf'))  # pylint: disable=anomalous-backslash-in-string
+    self.assertEqual(FileSystems.get_scheme('gs://abc/cdf'), 'gs')
+
+  def test_get_filesystem(self):
+    self.assertTrue(isinstance(FileSystems.get_filesystem('/tmp'),
+                               localfilesystem.LocalFileSystem))
+    self.assertTrue(isinstance(FileSystems.get_filesystem('c:\\abc\def'),  # pylint: disable=anomalous-backslash-in-string
+                               localfilesystem.LocalFileSystem))
+    with self.assertRaises(ValueError):
+      FileSystems.get_filesystem('error://abc/def')
+
   @mock.patch('apache_beam.io.localfilesystem.os')
   def test_unix_path_join(self, *unused_mocks):
     # Test joining of Unix paths.
@@ -110,7 +123,7 @@ class FileSystemsTest(unittest.TestCase):
     with self.assertRaises(BeamIOError) as error:
       FileSystems.match([None])
     self.assertTrue(
-        error.exception.message.startswith('Enable to get the Filesystem'))
+        error.exception.message.startswith('Unable to get the Filesystem'))
     self.assertEqual(error.exception.exception_details.keys(), [None])
 
   def test_match_directory(self):

http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/filesystems_util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py
deleted file mode 100644
index 6d21298..0000000
--- a/sdks/python/apache_beam/io/filesystems_util.py
+++ /dev/null
@@ -1,36 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-"""Utility functions for getting the correct file systems for a file name"""
-
-from apache_beam.io.localfilesystem import LocalFileSystem
-
-
-# TODO(BEAM-1585): Add a mechanism to add user implemented file systems
-def get_filesystem(path):
-  """Function that returns the FileSystem class to use based on the path
-  provided in the input.
-  """
-  if path.startswith('gs://'):
-    try:
-      from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
-    except ImportError:
-      raise ImportError(
-          'Google Cloud Platform IO not available, '
-          'please install apache_beam[gcp]')
-    return GCSFileSystem()
-  else:
-    return LocalFileSystem()

http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index ba00f50..dc71fce 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -34,6 +34,12 @@ class GCSFileSystem(FileSystem):
   CHUNK_SIZE = gcsio.MAX_BATCH_OPERATION_SIZE  # Chuck size in batch operations
   GCS_PREFIX = 'gs://'
 
+  @classmethod
+  def scheme(cls):
+    """URI scheme for the FileSystem
+    """
+    return 'gs'
+
   def join(self, basepath, *paths):
     """Join two or more pathname components for the filesystem
 

http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index 5fb9a62..923fc7d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -36,6 +36,11 @@ except ImportError:
 @unittest.skipIf(gcsfilesystem is None, 'GCP dependencies are not installed')
 class GCSFileSystemTest(unittest.TestCase):
 
+  def test_scheme(self):
+    file_system = gcsfilesystem.GCSFileSystem()
+    self.assertEqual(file_system.scheme(), 'gs')
+    self.assertEqual(gcsfilesystem.GCSFileSystem.scheme(), 'gs')
+
   def test_join(self):
     file_system = gcsfilesystem.GCSFileSystem()
     self.assertEqual('gs://bucket/path/to/file',

http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/localfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index bc7c576..c670704 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -33,6 +33,11 @@ from apache_beam.io.filesystem import MatchResult
 class LocalFileSystem(FileSystem):
   """A Local ``FileSystem`` implementation for accessing files on disk.
   """
+  @classmethod
+  def scheme(cls):
+    """URI scheme for the FileSystem
+    """
+    return None
 
   def join(self, basepath, *paths):
     """Join two or more pathname components for the filesystem

http://git-wip-us.apache.org/repos/asf/beam/blob/cbe182ee/sdks/python/apache_beam/io/localfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index 986dcaf..04cf5b7 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -61,6 +61,10 @@ class LocalFileSystemTest(unittest.TestCase):
   def tearDown(self):
     shutil.rmtree(self.tmpdir)
 
+  def test_scheme(self):
+    self.assertIsNone(self.fs.scheme())
+    self.assertIsNone(localfilesystem.LocalFileSystem.scheme())
+
   @mock.patch('apache_beam.io.localfilesystem.os')
   def test_unix_path_join(self, *unused_mocks):
     # Test joining of Unix paths.


Mime
View raw message