beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: [BEAM-1708] Improve error message when GCP not installed
Date Thu, 13 Apr 2017 21:55:04 GMT
Repository: beam
Updated Branches:
  refs/heads/master 69343a609 -> 75d7b273c


[BEAM-1708] Improve error message when GCP not installed


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/233a9bd3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/233a9bd3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/233a9bd3

Branch: refs/heads/master
Commit: 233a9bd3c503182dee004ac52acf37ace4eeac12
Parents: 69343a6
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Mon Apr 10 14:45:01 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Thu Apr 13 14:54:37 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filesystems_util.py  |  7 ++++++-
 sdks/python/apache_beam/io/gcp/bigquery.py      | 19 +++++++++++++++++++
 .../io/gcp/datastore/v1/datastoreio.py          | 20 ++++++++++++++++++++
 .../runners/dataflow/dataflow_runner.py         |  9 +++++++--
 sdks/python/apache_beam/runners/runner.py       | 10 +++++++++-
 5 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/233a9bd3/sdks/python/apache_beam/io/filesystems_util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystems_util.py b/sdks/python/apache_beam/io/filesystems_util.py
index 47c2361..6d21298 100644
--- a/sdks/python/apache_beam/io/filesystems_util.py
+++ b/sdks/python/apache_beam/io/filesystems_util.py
@@ -25,7 +25,12 @@ def get_filesystem(path):
   provided in the input.
   """
   if path.startswith('gs://'):
-    from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+    try:
+      from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem
+    except ImportError:
+      raise ImportError(
+          'Google Cloud Platform IO not available, '
+          'please install apache_beam[gcp]')
     return GCSFileSystem()
   else:
     return LocalFileSystem()

http://git-wip-us.apache.org/repos/asf/beam/blob/233a9bd3/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 788c069..9a8174a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -367,6 +367,15 @@ class BigQuerySource(dataflow_io.NativeSource):
       (3) both a table and a query is specified.
     """
 
+    # Import here to avoid adding the dependency for local running scenarios.
+    try:
+      # pylint: disable=wrong-import-order, wrong-import-position
+      from apitools.base.py import *
+    except ImportError:
+      raise ImportError(
+          'Google Cloud IO not available, '
+          'please install apache_beam[gcp]')
+
     if table is not None and query is not None:
       raise ValueError('Both a BigQuery table and a query were specified.'
                        ' Please specify only one of these.')
@@ -467,6 +476,16 @@ class BigQuerySink(dataflow_io.NativeSink):
       ValueError: if the table reference as a string does not match the expected
       format.
     """
+
+    # Import here to avoid adding the dependency for local running scenarios.
+    try:
+      # pylint: disable=wrong-import-order, wrong-import-position
+      from apitools.base.py import *
+    except ImportError:
+      raise ImportError(
+          'Google Cloud IO not available, '
+          'please install apache_beam[gcp]')
+
     self.table_reference = _parse_table_reference(table, dataset, project)
     # Transform the table schema into a bigquery.TableSchema instance.
     if isinstance(schema, basestring):

http://git-wip-us.apache.org/repos/asf/beam/blob/233a9bd3/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
index af0c72b..e8ca05d 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/datastoreio.py
@@ -92,6 +92,16 @@ class ReadFromDatastore(PTransform):
       namespace: An optional namespace.
       num_splits: Number of splits for the query.
     """
+
+    # Import here to avoid adding the dependency for local running scenarios.
+    try:
+      # pylint: disable=wrong-import-order, wrong-import-position
+      from apitools.base.py import *
+    except ImportError:
+      raise ImportError(
+          'Google Cloud IO not available, '
+          'please install apache_beam[gcp]')
+
     logging.warning('datastoreio read transform is experimental.')
     super(ReadFromDatastore, self).__init__()
 
@@ -368,6 +378,16 @@ class _Mutate(PTransform):
 class WriteToDatastore(_Mutate):
   """A ``PTransform`` to write a ``PCollection[Entity]`` to Cloud Datastore."""
   def __init__(self, project):
+
+    # Import here to avoid adding the dependency for local running scenarios.
+    try:
+      # pylint: disable=wrong-import-order, wrong-import-position
+      from apitools.base.py import *
+    except ImportError:
+      raise ImportError(
+          'Google Cloud IO not available, '
+          'please install apache_beam[gcp]')
+
     super(WriteToDatastore, self).__init__(
         project, WriteToDatastore.to_upsert_mutation)
 

http://git-wip-us.apache.org/repos/asf/beam/blob/233a9bd3/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 1a935c1..1a92c26 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -152,8 +152,13 @@ class DataflowRunner(PipelineRunner):
   def run(self, pipeline):
     """Remotely executes entire pipeline or parts reachable from node."""
     # Import here to avoid adding the dependency for local running scenarios.
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam.runners.dataflow.internal import apiclient
+    try:
+      # pylint: disable=wrong-import-order, wrong-import-position
+      from apache_beam.runners.dataflow.internal import apiclient
+    except ImportError:
+      raise ImportError(
+          'Google Cloud Dataflow runner not available, '
+          'please install apache_beam[gcp]')
     self.job = apiclient.Job(pipeline.options)
 
     # The superclass's run will trigger a traversal of all reachable nodes.

http://git-wip-us.apache.org/repos/asf/beam/blob/233a9bd3/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index de9c892..7e7ec24 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -78,7 +78,15 @@ def create_runner(runner_name):
 
   if '.' in runner_name:
     module, runner = runner_name.rsplit('.', 1)
-    return getattr(__import__(module, {}, {}, [runner], -1), runner)()
+    try:
+      return getattr(__import__(module, {}, {}, [runner], -1), runner)()
+    except ImportError:
+      if runner_name in _KNOWN_DATAFLOW_RUNNERS:
+        raise ImportError(
+            'Google Cloud Dataflow runner not available, '
+            'please install apache_beam[gcp]')
+      else:
+        raise
   else:
     raise ValueError(
         'Unexpected pipeline runner: %s. Valid values are %s '


Mime
View raw message