airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1734][Airflow 1734] Sqoop hook/operator enhancements
Date Sat, 28 Oct 2017 13:08:27 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master efdc4d3b4 -> 1d531555e


[AIRFLOW-1734][Airflow 1734] Sqoop hook/operator enhancements

Closes #2703 from Acehaidrey/sqoop_contrib_fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1d531555
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1d531555
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1d531555

Branch: refs/heads/master
Commit: 1d531555ecd594ee7ec2c5d3fc87f8d4bcc2c27e
Parents: efdc4d3
Author: Ace Haidrey <ahaidrey@pandora.com>
Authored: Sat Oct 28 15:07:56 2017 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Sat Oct 28 15:07:56 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/hooks/sqoop_hook.py            |  91 ++++++++------
 airflow/contrib/operators/sqoop_operator.py    |  53 +++++++--
 tests/contrib/hooks/test_sqoop_hook.py         | 113 +++++++++++++-----
 tests/contrib/operators/test_sqoop_operator.py | 124 ++++++++++++++++++--
 4 files changed, 293 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/airflow/contrib/hooks/sqoop_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/sqoop_hook.py b/airflow/contrib/hooks/sqoop_hook.py
index 5b00b15..c56fbcb 100644
--- a/airflow/contrib/hooks/sqoop_hook.py
+++ b/airflow/contrib/hooks/sqoop_hook.py
@@ -43,7 +43,7 @@ class SqoopHook(BaseHook, LoggingMixin):
     :param verbose: Set sqoop to verbose.
     :type verbose: bool
     :param num_mappers: Number of map tasks to import in parallel.
-    :type num_mappers: str
+    :type num_mappers: int
     :param properties: Properties to set via the -D argument
     :type properties: dict
     """
@@ -52,8 +52,6 @@ class SqoopHook(BaseHook, LoggingMixin):
                  num_mappers=None, hcatalog_database=None,
                  hcatalog_table=None, properties=None):
         # No mutable types in the default parameters
-        if properties is None:
-            properties = {}
         self.conn = self.get_connection(conn_id)
         connection_parameters = self.conn.extra_dejson
         self.job_tracker = connection_parameters.get('job_tracker', None)
@@ -66,10 +64,11 @@ class SqoopHook(BaseHook, LoggingMixin):
         self.hcatalog_table = hcatalog_table
         self.verbose = verbose
         self.num_mappers = num_mappers
-        self.properties = properties
+        self.properties = properties or {}
+        self.log.info("Using connection to: {}:{}/{}".format(self.conn.host, self.conn.port,
self.conn.schema))
 
     def get_conn(self):
-        pass
+        return self.conn
 
     def cmd_mask_password(self, cmd):
         try:
@@ -87,7 +86,7 @@ class SqoopHook(BaseHook, LoggingMixin):
         :param kwargs: extra arguments to Popen (see subprocess.Popen)
         :return: handle to subprocess
         """
-        self.log.info("Executing command: %s", ' '.join(cmd))
+        self.log.info("Executing command: {}".format(' '.join(self.cmd_mask_password(cmd))))
         sp = subprocess.Popen(cmd,
                               stdout=subprocess.PIPE,
                               stderr=subprocess.STDOUT,
@@ -101,32 +100,33 @@ class SqoopHook(BaseHook, LoggingMixin):
         self.log.info("Command exited with return code %s", sp.returncode)
 
         if sp.returncode:
-            raise AirflowException("Sqoop command failed: %s", ' '.join(cmd))
+            raise AirflowException("Sqoop command failed: {}".format(' '.join(self.cmd_mask_password(cmd))))
 
     def _prepare_command(self, export=False):
-        if export:
-            connection_cmd = ["sqoop", "export"]
-        else:
-            connection_cmd = ["sqoop", "import"]
+        sqoop_cmd_type = "export" if export else "import"
+        connection_cmd = ["sqoop", sqoop_cmd_type]
 
-        if self.verbose:
-            connection_cmd += ["--verbose"]
+        for key, value in self.properties.items():
+            connection_cmd += ["-D", "{}={}".format(key, value)]
+
+        if self.namenode:
+            connection_cmd += ["-fs", self.namenode]
         if self.job_tracker:
             connection_cmd += ["-jt", self.job_tracker]
-        if self.conn.login:
-            connection_cmd += ["--username", self.conn.login]
-        if self.conn.password:
-            connection_cmd += ["--password", self.conn.password]
-        if self.password_file:
-            connection_cmd += ["--password-file", self.password_file]
         if self.libjars:
             connection_cmd += ["-libjars", self.libjars]
         if self.files:
             connection_cmd += ["-files", self.files]
-        if self.namenode:
-            connection_cmd += ["-fs", self.namenode]
         if self.archives:
             connection_cmd += ["-archives", self.archives]
+        if self.conn.login:
+            connection_cmd += ["--username", self.conn.login]
+        if self.conn.password:
+            connection_cmd += ["--password", self.conn.password]
+        if self.password_file:
+            connection_cmd += ["--password-file", self.password_file]
+        if self.verbose:
+            connection_cmd += ["--verbose"]
         if self.num_mappers:
             connection_cmd += ["--num-mappers", str(self.num_mappers)]
         if self.hcatalog_database:
@@ -134,9 +134,6 @@ class SqoopHook(BaseHook, LoggingMixin):
         if self.hcatalog_table:
             connection_cmd += ["--hcatalog-table", self.hcatalog_table]
 
-        for key, value in self.properties.items():
-            connection_cmd += ["-D", "{}={}".format(key, value)]
-
         connection_cmd += ["--connect", "{}:{}/{}".format(
             self.conn.host,
             self.conn.port,
@@ -145,7 +142,8 @@ class SqoopHook(BaseHook, LoggingMixin):
 
         return connection_cmd
 
-    def _get_export_format_argument(self, file_type='text'):
+    @staticmethod
+    def _get_export_format_argument(file_type='text'):
         if file_type == "avro":
             return ["--as-avrodatafile"]
         elif file_type == "sequence":
@@ -159,11 +157,12 @@ class SqoopHook(BaseHook, LoggingMixin):
                                    "'sequence', 'parquet' or 'text'.")
 
     def _import_cmd(self, target_dir, append, file_type, split_by, direct,
-                    driver):
+                    driver, extra_import_options):
 
         cmd = self._prepare_command(export=False)
 
-        cmd += ["--target-dir", target_dir]
+        if target_dir:
+            cmd += ["--target-dir", target_dir]
 
         if append:
             cmd += ["--append"]
@@ -179,11 +178,16 @@ class SqoopHook(BaseHook, LoggingMixin):
         if driver:
             cmd += ["--driver", driver]
 
+        if extra_import_options:
+            for key, value in extra_import_options.items():
+                cmd += ['--{}'.format(key)]
+                if value: cmd += [value]
+
         return cmd
 
-    def import_table(self, table, target_dir, append=False, file_type="text",
+    def import_table(self, table, target_dir=None, append=False, file_type="text",
                      columns=None, split_by=None, where=None, direct=False,
-                     driver=None):
+                     driver=None, extra_import_options=None):
         """
         Imports table from remote location to target dir. Arguments are
         copies of direct sqoop command line arguments
@@ -197,9 +201,12 @@ class SqoopHook(BaseHook, LoggingMixin):
         :param where: WHERE clause to use during import
         :param direct: Use direct connector if exists for the database
         :param driver: Manually specify JDBC driver class to use
+        :param extra_import_options: Extra import options to pass as dict.
+            If a key doesn't have a value, just pass an empty string to it.
+            Don't include prefix of -- for sqoop options.
         """
         cmd = self._import_cmd(target_dir, append, file_type, split_by, direct,
-                               driver)
+                               driver, extra_import_options)
 
         cmd += ["--table", table]
 
@@ -210,9 +217,8 @@ class SqoopHook(BaseHook, LoggingMixin):
 
         self.Popen(cmd)
 
-    def import_query(self, query, target_dir,
-                     append=False, file_type="text",
-                     split_by=None, direct=None, driver=None):
+    def import_query(self, query, target_dir, append=False, file_type="text",
+                     split_by=None, direct=None, driver=None, extra_import_options=None):
         """
         Imports a specific query from the rdbms to hdfs
         :param query: Free format query to run
@@ -223,9 +229,12 @@ class SqoopHook(BaseHook, LoggingMixin):
         :param split_by: Column of the table used to split work units
         :param direct: Use direct import fast path
         :param driver: Manually specify JDBC driver class to use
+        :param extra_import_options: Extra import options to pass as dict.
+            If a key doesn't have a value, just pass an empty string to it.
+            Don't include prefix of -- for sqoop options.
         """
         cmd = self._import_cmd(target_dir, append, file_type, split_by, direct,
-                               driver)
+                               driver, extra_import_options)
         cmd += ["--query", query]
 
         self.Popen(cmd)
@@ -234,7 +243,7 @@ class SqoopHook(BaseHook, LoggingMixin):
                     input_null_non_string, staging_table, clear_staging_table,
                     enclosed_by, escaped_by, input_fields_terminated_by,
                     input_lines_terminated_by, input_optionally_enclosed_by,
-                    batch, relaxed_isolation):
+                    batch, relaxed_isolation, extra_export_options):
 
         cmd = self._prepare_command(export=True)
 
@@ -275,6 +284,11 @@ class SqoopHook(BaseHook, LoggingMixin):
         if export_dir:
             cmd += ["--export-dir", export_dir]
 
+        if extra_export_options:
+            for key, value in extra_export_options.items():
+                cmd += ['--{}'.format(key)]
+                if value: cmd += [value]
+
         # The required option
         cmd += ["--table", table]
 
@@ -286,7 +300,7 @@ class SqoopHook(BaseHook, LoggingMixin):
                      escaped_by, input_fields_terminated_by,
                      input_lines_terminated_by,
                      input_optionally_enclosed_by, batch,
-                     relaxed_isolation):
+                     relaxed_isolation, extra_export_options=None):
         """
         Exports Hive table to remote location. Arguments are copies of direct
         sqoop command line Arguments
@@ -308,6 +322,9 @@ class SqoopHook(BaseHook, LoggingMixin):
         :param batch: Use batch mode for underlying statement execution
         :param relaxed_isolation: Transaction isolation to read uncommitted
             for the mappers
+        :param extra_export_options: Extra export options to pass as dict.
+            If a key doesn't have a value, just pass an empty string to it.
+            Don't include prefix of -- for sqoop options.
         """
         cmd = self._export_cmd(table, export_dir, input_null_string,
                                input_null_non_string, staging_table,
@@ -315,6 +332,6 @@ class SqoopHook(BaseHook, LoggingMixin):
                                input_fields_terminated_by,
                                input_lines_terminated_by,
                                input_optionally_enclosed_by, batch,
-                               relaxed_isolation)
+                               relaxed_isolation, extra_export_options)
 
         self.Popen(cmd)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/airflow/contrib/operators/sqoop_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sqoop_operator.py b/airflow/contrib/operators/sqoop_operator.py
index c3da176..cdaf336 100644
--- a/airflow/contrib/operators/sqoop_operator.py
+++ b/airflow/contrib/operators/sqoop_operator.py
@@ -25,8 +25,15 @@ from airflow.utils.decorators import apply_defaults
 
 class SqoopOperator(BaseOperator):
     """
-    execute sqoop job
+    Execute a Sqoop job.
+    Documentation for Apache Sqoop can be found here: https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html.
     """
+    template_fields = ('conn_id', 'cmd_type', 'table', 'query', 'target_dir', 'file_type',
'columns', 'split_by',
+                       'where', 'export_dir', 'input_null_string', 'input_null_non_string',
'staging_table',
+                       'enclosed_by', 'escaped_by', 'input_fields_terminated_by', 'input_lines_terminated_by',
+                       'input_optionally_enclosed_by', 'properties', 'extra_import_options',
'driver',
+                       'extra_export_options', 'hcatalog_database', 'hcatalog_table',)
+    ui_color = '#7D8CA4'
 
     @apply_defaults
     def __init__(self,
@@ -36,7 +43,7 @@ class SqoopOperator(BaseOperator):
                  query=None,
                  target_dir=None,
                  append=None,
-                 file_type=None,
+                 file_type='text',
                  columns=None,
                  num_mappers=None,
                  split_by=None,
@@ -59,12 +66,18 @@ class SqoopOperator(BaseOperator):
                  properties=None,
                  hcatalog_database=None,
                  hcatalog_table=None,
+                 create_hcatalog_table=False,
+                 extra_import_options=None,
+                 extra_export_options=None,
                  *args,
                  **kwargs):
         """
         :param conn_id: str
         :param cmd_type: str specify command to execute "export" or "import"
         :param table: Table to read
+        :param query: Import result of arbitrary SQL query. Instead of using the table,
+            columns and where arguments, you can specify a SQL statement with the query
+            argument. Must also specify a destination directory with target_dir.
         :param target_dir: HDFS destination directory where the data
             from the rdbms will be written
         :param append: Append data to an existing dataset in HDFS
@@ -95,7 +108,14 @@ class SqoopOperator(BaseOperator):
         :param relaxed_isolation: use read uncommitted isolation level
         :param hcatalog_database: Specifies the database name for the HCatalog table
         :param hcatalog_table: The argument value for this option is the HCatalog table
+        :param create_hcatalog_table: Have sqoop create the hcatalog table passed in or not
         :param properties: additional JVM properties passed to sqoop
+        :param extra_import_options: Extra import options to pass as dict.
+            If a key doesn't have a value, just pass an empty string to it.
+            Don't include prefix of -- for sqoop options.
+        :param extra_export_options: Extra export options to pass as dict.
+            If a key doesn't have a value, just pass an empty string to it.
+            Don't include prefix of -- for sqoop options.
         """
         super(SqoopOperator, self).__init__(*args, **kwargs)
         self.conn_id = conn_id
@@ -126,10 +146,10 @@ class SqoopOperator(BaseOperator):
         self.relaxed_isolation = relaxed_isolation
         self.hcatalog_database = hcatalog_database
         self.hcatalog_table = hcatalog_table
-        # No mutable types in the default parameters
-        if properties is None:
-            properties = {}
+        self.create_hcatalog_table = create_hcatalog_table
         self.properties = properties
+        self.extra_import_options = extra_import_options
+        self.extra_export_options = extra_export_options
 
     def execute(self, context):
         """
@@ -156,9 +176,18 @@ class SqoopOperator(BaseOperator):
                 input_lines_terminated_by=self.input_lines_terminated_by,
                 input_optionally_enclosed_by=self.input_optionally_enclosed_by,
                 batch=self.batch,
-                relaxed_isolation=self.relaxed_isolation)
+                relaxed_isolation=self.relaxed_isolation,
+                extra_export_options=self.extra_export_options)
         elif self.cmd_type == 'import':
-            if not self.table:
+            # add create hcatalog table to extra import options if option passed
+            # if new params are added to constructor can pass them in here so don't modify
sqoop_hook for each param
+            if self.create_hcatalog_table:
+                self.extra_import_options['create-hcatalog-table'] = ''
+
+            if self.table and self.query:
+                raise AirflowException('Cannot specify query and table together. Need to
specify either or.')
+
+            if self.table:
                 hook.import_table(
                     table=self.table,
                     target_dir=self.target_dir,
@@ -168,16 +197,18 @@ class SqoopOperator(BaseOperator):
                     split_by=self.split_by,
                     where=self.where,
                     direct=self.direct,
-                    driver=self.driver)
-            elif not self.query:
+                    driver=self.driver,
+                    extra_import_options=self.extra_import_options)
+            elif self.query:
                 hook.import_query(
-                    query=self.table,
+                    query=self.query,
                     target_dir=self.target_dir,
                     append=self.append,
                     file_type=self.file_type,
                     split_by=self.split_by,
                     direct=self.direct,
-                    driver=self.driver)
+                    driver=self.driver,
+                    extra_import_options=self.extra_import_options)
             else:
                 raise AirflowException(
                     "Provide query or table parameter to import using Sqoop"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/tests/contrib/hooks/test_sqoop_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_sqoop_hook.py b/tests/contrib/hooks/test_sqoop_hook.py
index 7c934b9..3eba9ec 100644
--- a/tests/contrib/hooks/test_sqoop_hook.py
+++ b/tests/contrib/hooks/test_sqoop_hook.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 #
 
+import collections
 import json
 import unittest
 
@@ -40,8 +41,8 @@ class TestSqoopHook(unittest.TestCase):
     _config_export = {
         'table': 'domino.export_data_to',
         'export_dir': '/hdfs/data/to/be/exported',
-        'input_null_string': '\n',
-        'input_null_non_string': '\t',
+        'input_null_string': '\\n',
+        'input_null_non_string': '\\t',
         'staging_table': 'database.staging',
         'clear_staging_table': True,
         'enclosed_by': '"',
@@ -50,7 +51,11 @@ class TestSqoopHook(unittest.TestCase):
         'input_lines_terminated_by': '\n',
         'input_optionally_enclosed_by': '"',
         'batch': True,
-        'relaxed_isolation': True
+        'relaxed_isolation': True,
+        'extra_export_options': collections.OrderedDict([
+            ('update-key', 'id'),
+            ('update-mode', 'allowinsert')
+        ])
     }
     _config_import = {
         'target_dir': '/hdfs/data/target/location',
@@ -58,7 +63,11 @@ class TestSqoopHook(unittest.TestCase):
         'file_type': 'parquet',
         'split_by': '\n',
         'direct': True,
-        'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver'
+        'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver',
+        'extra_import_options': {
+            'hcatalog-storage-stanza': "\"stored as orcfile\"",
+            'show': ''
+        }
     }
 
     _config_json = {
@@ -94,10 +103,10 @@ class TestSqoopHook(unittest.TestCase):
         self.assertEqual(mock_popen.mock_calls[0], call(
             ['sqoop',
              'export',
+             '-fs', self._config_json['namenode'],
              '-jt', self._config_json['job_tracker'],
              '-libjars', self._config_json['libjars'],
              '-files', self._config_json['files'],
-             '-fs', self._config_json['namenode'],
              '-archives', self._config_json['archives'],
              '--connect', 'rmdbs:5050/schema',
              '--input-null-string', self._config_export['input_null_string'],
@@ -112,9 +121,25 @@ class TestSqoopHook(unittest.TestCase):
              '--batch',
              '--relaxed-isolation',
              '--export-dir', self._config_export['export_dir'],
+             '--update-key', 'id',
+             '--update-mode', 'allowinsert',
              '--table', self._config_export['table']], stderr=-2, stdout=-1))
 
+    def test_submit_none_mappers(self):
+        """
+        Test to check that if value of num_mappers is None, then it shouldn't be in the cmd
built.
+        """
+        _config_without_mappers = self._config.copy()
+        _config_without_mappers['num_mappers'] = None
+
+        hook = SqoopHook(**_config_without_mappers)
+        cmd = ' '.join(hook._prepare_command())
+        self.assertNotIn('--num-mappers', cmd)
+
     def test_submit(self):
+        """
+        Tests to verify that from connection extra option the options are added to the Sqoop
command.
+        """
         hook = SqoopHook(**self._config)
 
         cmd = ' '.join(hook._prepare_command())
@@ -124,20 +149,16 @@ class TestSqoopHook(unittest.TestCase):
             self.assertIn("-fs {}".format(self._config_json['namenode']), cmd)
 
         if self._config_json['job_tracker']:
-            self.assertIn("-jt {}".format(self._config_json['job_tracker']),
-                          cmd)
+            self.assertIn("-jt {}".format(self._config_json['job_tracker']), cmd)
 
         if self._config_json['libjars']:
-            self.assertIn("-libjars {}".format(self._config_json['libjars']),
-                          cmd)
+            self.assertIn("-libjars {}".format(self._config_json['libjars']), cmd)
 
         if self._config_json['files']:
             self.assertIn("-files {}".format(self._config_json['files']), cmd)
 
         if self._config_json['archives']:
-            self.assertIn(
-                "-archives {}".format(self._config_json['archives']), cmd
-            )
+            self.assertIn( "-archives {}".format(self._config_json['archives']), cmd)
 
         self.assertIn("--hcatalog-database {}".format(self._config['hcatalog_database']),
cmd)
         self.assertIn("--hcatalog-table {}".format(self._config['hcatalog_table']), cmd)
@@ -147,11 +168,8 @@ class TestSqoopHook(unittest.TestCase):
             self.assertIn("--verbose", cmd)
 
         if self._config['num_mappers']:
-            self.assertIn(
-                "--num-mappers {}".format(self._config['num_mappers']), cmd
-            )
+            self.assertIn( "--num-mappers {}".format(self._config['num_mappers']), cmd)
 
-        print(self._config['properties'])
         for key, value in self._config['properties'].items():
             self.assertIn("-D {}={}".format(key, value), cmd)
 
@@ -161,14 +179,15 @@ class TestSqoopHook(unittest.TestCase):
             hook.export_table(**self._config_export)
 
         with self.assertRaises(OSError):
-            hook.import_table(table='schema.table',
-                              target_dir='/sqoop/example/path')
+            hook.import_table(table='schema.table', target_dir='/sqoop/example/path')
 
         with self.assertRaises(OSError):
-            hook.import_query(query='SELECT * FROM sometable',
-                              target_dir='/sqoop/example/path')
+            hook.import_query(query='SELECT * FROM sometable', target_dir='/sqoop/example/path')
 
     def test_export_cmd(self):
+        """
+        Tests to verify the hook export command is building correct Sqoop export command.
+        """
         hook = SqoopHook()
 
         # The subprocess requires an array but we build the cmd by joining on a space
@@ -190,7 +209,9 @@ class TestSqoopHook(unittest.TestCase):
                 input_optionally_enclosed_by=self._config_export[
                     'input_optionally_enclosed_by'],
                 batch=self._config_export['batch'],
-                relaxed_isolation=self._config_export['relaxed_isolation'])
+                relaxed_isolation=self._config_export['relaxed_isolation'],
+                extra_export_options=self._config_export['extra_export_options']
+            )
         )
 
         self.assertIn("--input-null-string {}".format(
@@ -209,6 +230,9 @@ class TestSqoopHook(unittest.TestCase):
             self._config_export['input_lines_terminated_by']), cmd)
         self.assertIn("--input-optionally-enclosed-by {}".format(
             self._config_export['input_optionally_enclosed_by']), cmd)
+        # these options are from the extra export options
+        self.assertIn("--update-key id", cmd)
+        self.assertIn("--update-mode allowinsert", cmd)
 
         if self._config_export['clear_staging_table']:
             self.assertIn("--clear-staging-table", cmd)
@@ -220,16 +244,22 @@ class TestSqoopHook(unittest.TestCase):
             self.assertIn("--relaxed-isolation", cmd)
 
     def test_import_cmd(self):
+        """
+        Tests to verify the hook import command is building correct Sqoop import command.
+        """
         hook = SqoopHook()
 
         # The subprocess requires an array but we build the cmd by joining on a space
         cmd = ' '.join(
-            hook._import_cmd(self._config_import['target_dir'],
-                             append=self._config_import['append'],
-                             file_type=self._config_import['file_type'],
-                             split_by=self._config_import['split_by'],
-                             direct=self._config_import['direct'],
-                             driver=self._config_import['driver'])
+            hook._import_cmd(
+                self._config_import['target_dir'],
+                append=self._config_import['append'],
+                file_type=self._config_import['file_type'],
+                split_by=self._config_import['split_by'],
+                direct=self._config_import['direct'],
+                driver=self._config_import['driver'],
+                extra_import_options=None
+            )
         )
 
         if self._config_import['append']:
@@ -242,10 +272,32 @@ class TestSqoopHook(unittest.TestCase):
             self._config_import['target_dir']), cmd)
 
         self.assertIn('--driver {}'.format(self._config_import['driver']), cmd)
-        self.assertIn('--split-by {}'.format(self._config_import['split_by']),
-                      cmd)
+        self.assertIn('--split-by {}'.format(self._config_import['split_by']), cmd)
+        # these are from extra options, but not passed to this cmd import command
+        self.assertNotIn('--show', cmd)
+        self.assertNotIn('hcatalog-storage-stanza \"stored as orcfile\"', cmd)
+
+        cmd = ' '.join(
+            hook._import_cmd(
+                target_dir=None,
+                append=self._config_import['append'],
+                file_type=self._config_import['file_type'],
+                split_by=self._config_import['split_by'],
+                direct=self._config_import['direct'],
+                driver=self._config_import['driver'],
+                extra_import_options=self._config_import['extra_import_options']
+            )
+        )
+
+        self.assertNotIn('--target-dir', cmd)
+        # these checks are from the extra import options
+        self.assertIn('--show', cmd)
+        self.assertIn('hcatalog-storage-stanza \"stored as orcfile\"', cmd)
 
     def test_get_export_format_argument(self):
+        """
+        Tests to verify the hook get format function is building correct Sqoop command with
correct format type.
+        """
         hook = SqoopHook()
         self.assertIn("--as-avrodatafile",
                       hook._get_export_format_argument('avro'))
@@ -259,6 +311,9 @@ class TestSqoopHook(unittest.TestCase):
             hook._get_export_format_argument('unknown')
 
     def test_cmd_mask_password(self):
+        """
+        Tests to verify the hook masking function will correctly mask a user password in
Sqoop command.
+        """
         hook = SqoopHook()
         self.assertEqual(
             hook.cmd_mask_password(['--password', 'supersecret']),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d531555/tests/contrib/operators/test_sqoop_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_sqoop_operator.py b/tests/contrib/operators/test_sqoop_operator.py
index f632805..d9e39b5 100644
--- a/tests/contrib/operators/test_sqoop_operator.py
+++ b/tests/contrib/operators/test_sqoop_operator.py
@@ -23,6 +23,7 @@ from airflow.exceptions import AirflowException
 
 class TestSqoopOperator(unittest.TestCase):
     _config = {
+        'conn_id': 'sqoop_default',
         'cmd_type': 'export',
         'table': 'target_table',
         'query': 'SELECT * FROM schema.table',
@@ -46,10 +47,19 @@ class TestSqoopOperator(unittest.TestCase):
         'relaxed_isolation': True,
         'direct': True,
         'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver',
+        'create_hcatalog_table': True,
         'hcatalog_database': 'hive_database',
         'hcatalog_table': 'hive_table',
         'properties': {
             'mapred.map.max.attempts': '1'
+        },
+        'extra_import_options': {
+            'hcatalog-storage-stanza': "\"stored as orcfile\"",
+            'show': ''
+        },
+        'extra_export_options': {
+            'update-key': 'id',
+            'update-mode': 'allowinsert'
         }
     }
 
@@ -61,15 +71,18 @@ class TestSqoopOperator(unittest.TestCase):
         }
         self.dag = DAG('test_dag_id', default_args=args)
 
-    def test_execute(self, conn_id='sqoop_default'):
+    def test_execute(self):
+        """
+        Tests to verify values of the SqoopOperator match that passed in from the config.
+        """
         operator = SqoopOperator(
             task_id='sqoop_job',
             dag=self.dag,
             **self._config
         )
 
-        self.assertEqual(conn_id, operator.conn_id)
-
+        self.assertEqual(self._config['conn_id'], operator.conn_id)
+        self.assertEqual(self._config['query'], operator.query)
         self.assertEqual(self._config['cmd_type'], operator.cmd_type)
         self.assertEqual(self._config['table'], operator.table)
         self.assertEqual(self._config['target_dir'], operator.target_dir)
@@ -77,28 +90,117 @@ class TestSqoopOperator(unittest.TestCase):
         self.assertEqual(self._config['file_type'], operator.file_type)
         self.assertEqual(self._config['num_mappers'], operator.num_mappers)
         self.assertEqual(self._config['split_by'], operator.split_by)
-        self.assertEqual(self._config['input_null_string'],
-                         operator.input_null_string)
-        self.assertEqual(self._config['input_null_non_string'],
-                         operator.input_null_non_string)
+        self.assertEqual(self._config['input_null_string'], operator.input_null_string)
+        self.assertEqual(self._config['input_null_non_string'], operator.input_null_non_string)
         self.assertEqual(self._config['staging_table'], operator.staging_table)
-        self.assertEqual(self._config['clear_staging_table'],
-                         operator.clear_staging_table)
+        self.assertEqual(self._config['clear_staging_table'], operator.clear_staging_table)
         self.assertEqual(self._config['batch'], operator.batch)
-        self.assertEqual(self._config['relaxed_isolation'],
-                         operator.relaxed_isolation)
+        self.assertEqual(self._config['relaxed_isolation'], operator.relaxed_isolation)
         self.assertEqual(self._config['direct'], operator.direct)
         self.assertEqual(self._config['driver'], operator.driver)
         self.assertEqual(self._config['properties'], operator.properties)
         self.assertEqual(self._config['hcatalog_database'], operator.hcatalog_database)
         self.assertEqual(self._config['hcatalog_table'], operator.hcatalog_table)
+        self.assertEqual(self._config['create_hcatalog_table'], operator.create_hcatalog_table)
+        self.assertEqual(self._config['extra_import_options'], operator.extra_import_options)
+        self.assertEqual(self._config['extra_export_options'], operator.extra_export_options)
+
+        # the following are meant to be more of examples
+        sqoop_import_op = SqoopOperator(
+            task_id='sqoop_import_using_table',
+            cmd_type='import',
+            conn_id='sqoop_default',
+            table='company',
+            verbose=True,
+            num_mappers=8,
+            hcatalog_database='default',
+            hcatalog_table='import_table_1',
+            create_hcatalog_table=True,
+            extra_import_options={'hcatalog-storage-stanza': "\"stored as orcfile\""},
+            dag=self.dag
+        )
+
+        sqoop_import_op_qry = SqoopOperator(
+            task_id='sqoop_import_using_query',
+            cmd_type='import',
+            conn_id='sqoop_default',
+            query='select name, age from company where $CONDITIONS',
+            split_by='age', # the mappers will pass in values to the $CONDITIONS based on
the field you select to split by
+            verbose=True,
+            num_mappers=None,
+            hcatalog_database='default',
+            hcatalog_table='import_table_2',
+            create_hcatalog_table=True,
+            extra_import_options={'hcatalog-storage-stanza': "\"stored as orcfile\""},
+            dag=self.dag
+        )
+
+        sqoop_import_op_with_partition = SqoopOperator(
+            task_id='sqoop_import_with_partition',
+            cmd_type='import',
+            conn_id='sqoop_default',
+            table='company',
+            verbose=True,
+            num_mappers=None,
+            hcatalog_database='default',
+            hcatalog_table='import_table_3',
+            create_hcatalog_table=True,
+            extra_import_options={
+                'hcatalog-storage-stanza': "\"stored as orcfile\"",
+                'hive-partition-key': 'day',
+                'hive-partition-value': '2017-10-18'},
+            dag=self.dag
+        )
+
+        sqoop_export_op_name = SqoopOperator(
+            task_id='sqoop_export_tablename',
+            cmd_type='export',
+            conn_id='sqoop_default',
+            table='rbdms_export_table_1',
+            verbose=True,
+            num_mappers=None,
+            hcatalog_database='default',
+            hcatalog_table='hive_export_table_1',
+            extra_export_options=None,
+            dag=self.dag
+        )
+
+        sqoop_export_op_path = SqoopOperator(
+            task_id='sqoop_export_tablepath',
+            cmd_type='export',
+            conn_id='sqoop_default',
+            table='rbdms_export_table_2',
+            export_dir='/user/hive/warehouse/export_table_2',
+            direct=True,  # speeds up for data transfer
+            verbose=True,
+            num_mappers=None,
+            extra_export_options=None,
+            dag=self.dag
+        )
 
     def test_invalid_cmd_type(self):
+        """
+        Tests to verify if the cmd_type is not import or export, an exception is raised.
+        """
         operator = SqoopOperator(task_id='sqoop_job', dag=self.dag,
                                  cmd_type='invalid')
         with self.assertRaises(AirflowException):
             operator.execute({})
 
+    def test_invalid_import_options(self):
+        """
+        Tests to verify if a user passes both a query and a table then an exception is raised.
+        """
+        import_query_and_table_configs = self._config.copy()
+        import_query_and_table_configs['cmd_type'] = 'import'
+        operator = SqoopOperator(
+            task_id='sqoop_job',
+            dag=self.dag,
+            **import_query_and_table_configs
+        )
+        with self.assertRaises(AirflowException):
+            operator.execute({})
+
 
 if __name__ == '__main__':
     unittest.main()


Mime
View raw message