airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From san...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-361] Add default failure handler for the Qubole Operator
Date Thu, 25 Aug 2016 06:23:09 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master ee7c6b0e9 -> 16fac9890


[AIRFLOW-361] Add default failure handler for the Qubole Operator

Default failure and retry handler in Qubole Operator will make sure
that commands at Qubole and tasks in Airflow are in sync, and there
are no unwatched commands running at Qubole side.

Dear Airflow Maintainers,

Please accept this PR that addresses the following
issues:
- *https://issues.apache.org/jira/browse/AIRFLOW-3
61*

Thanks,
Sumit

Closes #1682 from msumit/AIRFLOW-361


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/16fac989
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/16fac989
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/16fac989

Branch: refs/heads/master
Commit: 16fac9890aa72490665ad49cc1e150c0d5591d4f
Parents: ee7c6b0
Author: Sumit Maheshwari <sumitm@qubole.com>
Authored: Wed Aug 24 23:19:35 2016 -0700
Committer: Siddharth Anand <siddharthanand@yahoo.com>
Committed: Wed Aug 24 23:19:44 2016 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/qubole_hook.py         | 64 +++++++++++++++++------
 airflow/contrib/operators/qubole_operator.py | 52 ++++++++++++------
 2 files changed, 83 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16fac989/airflow/contrib/hooks/qubole_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/qubole_hook.py b/airflow/contrib/hooks/qubole_hook.py
index 694b12f..93db72a 100755
--- a/airflow/contrib/hooks/qubole_hook.py
+++ b/airflow/contrib/hooks/qubole_hook.py
@@ -22,10 +22,12 @@ import six
 from airflow.exceptions import AirflowException
 from airflow.hooks.base_hook import BaseHook
 from airflow import configuration
+from airflow.utils.state import State
 
 from qds_sdk.qubole import Qubole
-from qds_sdk.commands import Command, HiveCommand, PrestoCommand, HadoopCommand, PigCommand,
ShellCommand, \
-    SparkCommand, DbTapQueryCommand, DbExportCommand, DbImportCommand
+from qds_sdk.commands import Command, HiveCommand, PrestoCommand, HadoopCommand, \
+    PigCommand, ShellCommand, SparkCommand, DbTapQueryCommand, DbExportCommand, \
+    DbImportCommand
 
 
 COMMAND_CLASSES = {
@@ -45,15 +47,24 @@ HYPHEN_ARGS = ['cluster_label', 'app_id']
 POSITIONAL_ARGS = ['sub_command', 'parameters']
 
 COMMAND_ARGS = {
-    "hivecmd": ['query', 'script_location', 'macros', 'tags', 'sample_size', 'cluster_label',
'name'],
+    "hivecmd": ['query', 'script_location', 'macros', 'tags', 'sample_size',
+                'cluster_label', 'name'],
     'prestocmd': ['query', 'script_location', 'macros', 'tags', 'cluster_label', 'name'],
     'hadoopcmd': ['sub_command', 'tags', 'cluster_label', 'name'],
-    'shellcmd': ['script', 'script_location', 'files', 'archives', 'parameters', 'tags',
'cluster_label', 'name'],
-    'pigcmd': ['script', 'script_location', 'parameters', 'tags', 'cluster_label', 'name'],
+    'shellcmd': ['script', 'script_location', 'files', 'archives', 'parameters', 'tags',
+                 'cluster_label', 'name'],
+    'pigcmd': ['script', 'script_location', 'parameters', 'tags', 'cluster_label',
+               'name'],
     'dbtapquerycmd': ['db_tap_id', 'query', 'macros', 'tags', 'name'],
-    'sparkcmd': ['program', 'cmdline', 'sql', 'script_location', 'macros', 'tags', 'cluster_label',
'language', 'app_id', 'name', 'arguments', 'user_program_arguments'],
-    'dbexportcmd': ['mode', 'hive_table', 'partition_spec', 'dbtap_id', 'db_table', 'db_update_mode',
'db_update_keys', 'export_dir', 'fields_terminated_by', 'tags', 'name'],
-    'dbimportcmd': ['mode', 'hive_table', 'dbtap_id', 'db_table', 'where_clause', 'parallelism',
'extract_query', 'boundary_query', 'split_column', 'tags', 'name']
+    'sparkcmd': ['program', 'cmdline', 'sql', 'script_location', 'macros', 'tags',
+                 'cluster_label', 'language', 'app_id', 'name', 'arguments',
+                 'user_program_arguments'],
+    'dbexportcmd': ['mode', 'hive_table', 'partition_spec', 'dbtap_id', 'db_table',
+                    'db_update_mode', 'db_update_keys', 'export_dir',
+                    'fields_terminated_by', 'tags', 'name'],
+    'dbimportcmd': ['mode', 'hive_table', 'dbtap_id', 'db_table', 'where_clause',
+                    'parallelism', 'extract_query', 'boundary_query', 'split_column',
+                    'tags', 'name']
 }
 
 
@@ -67,22 +78,41 @@ class QuboleHook(BaseHook):
         self.cls = COMMAND_CLASSES[self.kwargs['command_type']]
         self.cmd = None
 
+    @staticmethod
+    def handle_failure_retry(context):
+        ti = context['ti']
+        cmd_id = ti.xcom_pull(key='qbol_cmd_id', task_ids=ti.task_id)
+
+        if cmd_id is not None:
+            logger = logging.getLogger("QuboleHook")
+            cmd = Command.find(cmd_id)
+            if cmd is not None:
+                if cmd.status == 'done':
+                    logger.info('Command ID: %s has been succeeded, hence marking this '
+                                'TI as Success.', cmd_id)
+                    ti.state = State.SUCCESS
+                elif cmd.status == 'running':
+                    logger.info('Cancelling the Qubole Command Id: %s', cmd_id)
+                    cmd.cancel()
+
     def execute(self, context):
         args = self.cls.parse(self.create_cmd_args(context))
         self.cmd = self.cls.create(**args)
         context['task_instance'].xcom_push(key='qbol_cmd_id', value=self.cmd.id)
-        logging.info("Qubole command created with Id: {0} and Status: {1}".format(str(self.cmd.id),
self.cmd.status))
+        logging.info("Qubole command created with Id: %s and Status: %s",
+                     self.cmd.id, self.cmd.status)
 
         while not Command.is_done(self.cmd.status):
             time.sleep(Qubole.poll_interval)
             self.cmd = self.cls.find(self.cmd.id)
-            logging.info("Command Id: {0} and Status: {1}".format(str(self.cmd.id), self.cmd.status))
+            logging.info("Command Id: %s and Status: %s", self.cmd.id, self.cmd.status)
 
         if 'fetch_logs' in self.kwargs and self.kwargs['fetch_logs'] is True:
-            logging.info("Logs for Command Id: {0} \n{1}".format(str(self.cmd.id), self.cmd.get_log()))
+            logging.info("Logs for Command Id: %s \n%s", self.cmd.id, self.cmd.get_log())
 
         if self.cmd.status != 'done':
-            raise AirflowException('Command Id: {0} failed with Status: {1}'.format(self.cmd.id,
self.cmd.status))
+            raise AirflowException('Command Id: {0} failed with Status: {1}'.format(
+                                   self.cmd.id, self.cmd.status))
 
     def kill(self, ti):
         """
@@ -91,20 +121,20 @@ class QuboleHook(BaseHook):
         :return: response from Qubole
         """
         if self.cmd is None:
-            cmd_id = ti.xcom_pull(key="return_value", task_ids=self.task_id)
+            cmd_id = ti.xcom_pull(key="qbol_cmd_id", task_ids=ti.task_id)
             self.cmd = self.cls.find(cmd_id)
         if self.cls and self.cmd:
-            logging.info('Sending KILL signal to Qubole Command Id: {0}'.format(self.cmd.id))
+            logging.info('Sending KILL signal to Qubole Command Id: %s', self.cmd.id)
             self.cmd.cancel()
 
     def get_results(self, ti=None, fp=None, inline=True, delim=None, fetch=True):
         """
-        Get results (or just s3 locations) of a command from Qubole and save it into a file
+        Get results (or just s3 locations) of a command from Qubole and save into a file
         :param ti: Task Instance of the dag, used to determine the Quboles command id
         :param fp: Optional file pointer, will create one and return if None passed
         :param inline: True to download actual results, False to get s3 locations only
-        :param delim: Replaces the CTL-A chars with the given delim, defalt to ',' if None
given
-        :param fetch: when inline is True, get results directly from s3 if results are large
+        :param delim: Replaces the CTL-A chars with the given delim, defaults to ','
+        :param fetch: when inline is True, get results directly from s3 (if large)
         :return: file location containing actual results or s3 locations of results
         """
         if fp is None:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/16fac989/airflow/contrib/operators/qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py
index c462dca..a27458e 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -42,20 +42,28 @@ class QuboleOperator(BaseOperator):
             :script_location: s3 location containing query statement
             :macros: macro values which were used in query
         hadoopcmd:
-            :sub_commnad: must be one these ["jar", "s3distcp", "streaming"] followed by
1 or more args
+            :sub_commnad: must be one these ["jar", "s3distcp", "streaming"] followed by
+                1 or more args
         shellcmd:
             :script: inline command with args
             :script_location: s3 location containing query statement
-            :files: list of files in s3 bucket as file1,file2 format. These files will be
copied into the working directory where the qubole command is being executed.
-            :archives: list of archives in s3 bucket as archive1,archive2 format. These will
be unarchived intothe working directory where the qubole command is being executed
-            :parameters: any extra args which need to be passed to script (only when script_location
is supplied)
+            :files: list of files in s3 bucket as file1,file2 format. These files will be
+                copied into the working directory where the qubole command is being
+                executed.
+            :archives: list of archives in s3 bucket as archive1,archive2 format. These
+                will be unarchived intothe working directory where the qubole command is
+                being executed
+            :parameters: any extra args which need to be passed to script (only when
+                script_location is supplied)
         pigcmd:
             :script: inline query statement (latin_statements)
             :script_location: s3 location containing pig query
-            :parameters: any extra args which need to be passed to script (only when script_location
is supplied
+            :parameters: any extra args which need to be passed to script (only when
+                script_location is supplied
         sparkcmd:
             :program: the complete Spark Program in Scala, SQL, Command, R, or Python
-            :cmdline: spark-submit command line, all required information must be specify
in cmdline itself.
+            :cmdline: spark-submit command line, all required information must be specify
+                in cmdline itself.
             :sql: inline sql query
             :script_location: s3 location containing query statement
             :language: language of the program, Scala, SQL, Command, R, or Python
@@ -76,7 +84,7 @@ class QuboleOperator(BaseOperator):
             :db_update_mode: allowinsert or updateonly
             :db_update_keys: columns used to determine the uniqueness of rows
             :export_dir: HDFS/S3 location from which data will be exported.
-            :fields_terminated_by: hex of the char used as column separator in the dataset.
+            :fields_terminated_by: hex of the char used as column separator in the dataset
         dbimportcmd:
             :mode: 1 (simple), 2 (advance)
             :hive_table: Name of the hive table
@@ -84,17 +92,26 @@ class QuboleOperator(BaseOperator):
             :db_table: name of the db table
             :where_clause: where clause, if any
             :parallelism: number of parallel db connections to use for extracting data
-            :extract_query: SQL query to extract data from db. $CONDITIONS must be part of
the where clause.
+            :extract_query: SQL query to extract data from db. $CONDITIONS must be part
+                of the where clause.
             :boundary_query: Query to be used get range of row IDs to be extracted
             :split_column: Column used as row ID to split data into ranges (mode 2)
 
-    .. note:: Following fields are template-supported : ``query``, ``script_location``, ``sub_command``,
``script``, ``files``,
-        ``archives``, ``program``, ``cmdline``, ``sql``, ``where_clause``, ``extract_query``,
``boundary_query``, ``macros``, ``tags``,
-        ``name``, ``parameters``. You can also use ``.txt`` files for template driven use
cases.
+    .. note:: Following fields are template-supported : ``query``, ``script_location``,
+        ``sub_command``, ``script``, ``files``, ``archives``, ``program``, ``cmdline``,
+        ``sql``, ``where_clause``, ``extract_query``, ``boundary_query``, ``macros``,
+        ``tags``, ``name``, ``parameters``. You can also use ``.txt`` files for template
+        driven use cases.
+
+    .. note:: In QuboleOperator there is a default handler for task failures and retries,
+        which generally kills the command running at QDS for the corresponding task
+        instance. You can override this behavior by providing your own failure and retry
+        handler in task definition.
     """
 
-    template_fields = ('query', 'script_location', 'sub_command', 'script', 'files', 'archives',
'program', 'cmdline',
-                       'sql', 'where_clause', 'extract_query', 'boundary_query', 'macros',
'tags', 'name', 'parameters')
+    template_fields = ('query', 'script_location', 'sub_command', 'script', 'files',
+                       'archives', 'program', 'cmdline', 'sql', 'where_clause', 'tags',
+                       'extract_query', 'boundary_query', 'macros', 'name', 'parameters')
     template_ext = ('.txt',)
     ui_color = '#3064A1'
     ui_fgcolor = '#fff'
@@ -107,6 +124,12 @@ class QuboleOperator(BaseOperator):
         self.hook = QuboleHook(*self.args, **self.kwargs)
         super(QuboleOperator, self).__init__(*args, **kwargs)
 
+        if self.on_failure_callback is None:
+            self.on_failure_callback = QuboleHook.handle_failure_retry
+
+        if self.on_retry_callback is None:
+            self.on_retry_callback = QuboleHook.handle_failure_retry
+
     def execute(self, context):
         # Reinitiating the hook, as some template fields might have changed
         self.hook = QuboleHook(*self.args, **self.kwargs)
@@ -138,6 +161,3 @@ class QuboleOperator(BaseOperator):
             self.kwargs[name] = value
         else:
             object.__setattr__(self, name, value)
-
-
-


Mime
View raw message