airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1324] Generalize Druid operator and hook
Date Fri, 18 Aug 2017 19:35:12 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master d22340aab -> de99aa20f


[AIRFLOW-1324] Generalize Druid operator and hook

Make the druid operator and hook more specific.
This allows us to
have a more flexible configuration, for example
ingest parquet.
Also get rid of the PyDruid extension since it is
more focussed on
querying druid, rather than ingesting data. Just
requests is
sufficient to submit an indexing job. Add a test
to the hive_to_druid
operator to make sure it behaves as we expect.
Furthermore cleaned
up the docstring a bit

Closes #2378 from Fokko/AIRFLOW-1324-make-more-
general-druid-hook-and-operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/de99aa20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/de99aa20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/de99aa20

Branch: refs/heads/master
Commit: de99aa20f4ffaaf0757d339abcc96961172d238c
Parents: d22340a
Author: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Authored: Fri Aug 18 21:34:03 2017 +0200
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Fri Aug 18 21:34:03 2017 +0200

----------------------------------------------------------------------
 airflow/contrib/operators/druid_operator.py    |  47 +++++
 airflow/hooks/druid_hook.py                    | 215 ++++++--------------
 airflow/operators/hive_to_druid.py             | 116 +++++++++--
 airflow/utils/db.py                            |   4 +
 scripts/ci/requirements.txt                    |   2 +-
 setup.py                                       |   5 +-
 tests/contrib/operators/test_druid_operator.py |  47 +++++
 tests/hooks/test_druid_hook.py                 |  99 +++++++++
 tests/operators/test_hive_to_druid.py          | 132 ++++++++++++
 9 files changed, 491 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/de99aa20/airflow/contrib/operators/druid_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/druid_operator.py b/airflow/contrib/operators/druid_operator.py
new file mode 100644
index 0000000..fa8b1e3
--- /dev/null
+++ b/airflow/contrib/operators/druid_operator.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+from airflow.hooks.druid_hook import DruidHook
+from airflow.models import BaseOperator
+
+
+class DruidOperator(BaseOperator):
+    """
+    Allows to submit a task directly to druid
+
+    :param json_index_file: The filepath to the druid index specification
+    :type json_index_file: str
+    :param druid_ingest_conn_id: The connection id of the Druid overlord which accepts index
jobs
+    :type druid_ingest_conn_id: str
+    """
+    template_fields = ('intervals',)
+    template_ext = ('.json',)
+
+    def __init__(
+            self,
+            json_index_file,
+            druid_ingest_conn_id='druid_ingest_default',
+            *args, **kwargs):
+
+        super(DruidOperator, self).__init__(*args, **kwargs)
+        self.conn_id = druid_ingest_conn_id
+
+        with open(json_index_file) as data_file:
+            self.index_spec = json.load(data_file)
+
+    def execute(self, context):
+        hook = DruidHook(druid_ingest_conn_id=self.conn_id)
+        hook.submit_indexing_job(json.dumps(self.index_spec))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/de99aa20/airflow/hooks/druid_hook.py
----------------------------------------------------------------------
diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
index 9f0a022..350c230 100644
--- a/airflow/hooks/druid_hook.py
+++ b/airflow/hooks/druid_hook.py
@@ -13,179 +13,80 @@
 # limitations under the License.
 
 from __future__ import print_function
-import logging
-import json
-import time
 
-from pydruid.client import PyDruid
+import logging
 import requests
+import time
 
-from airflow.hooks.base_hook import BaseHook
 from airflow.exceptions import AirflowException
-
-LOAD_CHECK_INTERVAL = 5
-DEFAULT_TARGET_PARTITION_SIZE = 5000000
-
-
-class AirflowDruidLoadException(AirflowException):
-    pass
+from airflow.hooks.base_hook import BaseHook
 
 
 class DruidHook(BaseHook):
-    '''
-    Interact with druid.
-    '''
+    """
+    Connection to Druid
+
+    :param druid_ingest_conn_id: The connection id to the Druid overlord machine which accepts
index jobs
+    :type druid_ingest_conn_id: string
+    :param timeout: The interval between polling the Druid job for the status of the ingestion
job
+    :type timeout: int
+    :param max_ingestion_time: The maximum ingestion time before assuming the job failed
+    :type max_ingestion_time: int
+    """
 
     def __init__(
             self,
-            druid_query_conn_id='druid_query_default',
-            druid_ingest_conn_id='druid_ingest_default'):
-        self.druid_query_conn_id = druid_query_conn_id
+            druid_ingest_conn_id='druid_ingest_default',
+            timeout=1,
+            max_ingestion_time=18000):
+
         self.druid_ingest_conn_id = druid_ingest_conn_id
+        self.timeout = timeout
+        self.max_ingestion_time = max_ingestion_time
         self.header = {'content-type': 'application/json'}
 
-    def get_conn(self):
-        """
-        Returns a druid connection object for query
-        """
-        conn = self.get_connection(self.druid_query_conn_id)
-        return PyDruid(
-            "http://{conn.host}:{conn.port}".format(**locals()),
-            conn.extra_dejson.get('endpoint', ''))
-
-    @property
-    def ingest_post_url(self):
+    def get_conn_url(self):
         conn = self.get_connection(self.druid_ingest_conn_id)
         host = conn.host
         port = conn.port
+        schema = conn.extra_dejson.get('schema', 'http')
         endpoint = conn.extra_dejson.get('endpoint', '')
         return "http://{host}:{port}/{endpoint}".format(**locals())
 
-    def get_ingest_status_url(self, task_id):
-        post_url = self.ingest_post_url
-        return "{post_url}/{task_id}/status".format(**locals())
-
-    def construct_ingest_query(
-            self, datasource, static_path, ts_dim, columns, metric_spec,
-            intervals, num_shards, target_partition_size,
-            query_granularity="NONE", segment_granularity="DAY",
-            hadoop_dependency_coordinates=None):
-        """
-        Builds an ingest query for an HDFS TSV load.
-
-        :param datasource: target datasource in druid
-        :param columns: list of all columns in the TSV, in the right order
-        """
-
-        # backward compatibilty for num_shards, but target_partition_size is the default
setting
-        # and overwrites the num_shards
-        if target_partition_size == -1:
-            if num_shards == -1:
-                target_partition_size = DEFAULT_TARGET_PARTITION_SIZE
-        else:
-            num_shards = -1
-
-        metric_names = [
-            m['fieldName'] for m in metric_spec if m['type'] != 'count']
-        dimensions = [c for c in columns if c not in metric_names and c != ts_dim]
-        ingest_query_dict = {
-            "type": "index_hadoop",
-            "spec": {
-                "dataSchema": {
-                    "metricsSpec": metric_spec,
-                    "granularitySpec": {
-                        "queryGranularity": query_granularity,
-                        "intervals": intervals,
-                        "type": "uniform",
-                        "segmentGranularity": segment_granularity,
-                    },
-                    "parser": {
-                        "type": "string",
-                        "parseSpec": {
-                            "columns": columns,
-                            "dimensionsSpec": {
-                                "dimensionExclusions": [],
-                                "dimensions": dimensions,  # list of names
-                                "spatialDimensions": []
-                            },
-                            "timestampSpec": {
-                                "column": ts_dim,
-                                "format": "auto"
-                            },
-                            "format": "tsv"
-                        }
-                    },
-                    "dataSource": datasource
-                },
-                "tuningConfig": {
-                    "type": "hadoop",
-                    "jobProperties": {
-                        "mapreduce.job.user.classpath.first": "false",
-                        "mapreduce.map.output.compress": "false",
-                        "mapreduce.output.fileoutputformat.compress": "false",
-                    },
-                    "partitionsSpec": {
-                        "type": "hashed",
-                        "targetPartitionSize": target_partition_size,
-                        "numShards": num_shards,
-                    },
-                },
-                "ioConfig": {
-                    "inputSpec": {
-                        "paths": static_path,
-                        "type": "static"
-                    },
-                    "type": "hadoop"
-                }
-            }
-        }
-        if hadoop_dependency_coordinates:
-            ingest_query_dict[
-                'hadoopDependencyCoordinates'] = hadoop_dependency_coordinates
-
-        return json.dumps(ingest_query_dict, indent=4)
-
-    def send_ingest_query(
-            self, datasource, static_path, ts_dim, columns, metric_spec,
-            intervals, num_shards, target_partition_size, query_granularity, segment_granularity,
-            hadoop_dependency_coordinates=None):
-        query = self.construct_ingest_query(
-            datasource, static_path, ts_dim, columns,
-            metric_spec, intervals, num_shards, target_partition_size,
-            query_granularity, segment_granularity, hadoop_dependency_coordinates)
-        r = requests.post(
-            self.ingest_post_url, headers=self.header, data=query)
-        logging.info(self.ingest_post_url)
-        logging.info(query)
-        logging.info(r.text)
-        d = json.loads(r.text)
-        if "task" not in d:
-            raise AirflowDruidLoadException(
-                "[Error]: Ingesting data to druid failed.")
-        return d["task"]
-
-    def load_from_hdfs(
-            self, datasource, static_path,  ts_dim, columns,
-            intervals, num_shards, target_partition_size, query_granularity, segment_granularity,
-            metric_spec=None, hadoop_dependency_coordinates=None):
-        """
-        load data to druid from hdfs
-
-        :param ts_dim: The column name to use as a timestamp
-        :param metric_spec: A list of dictionaries
-        """
-        task_id = self.send_ingest_query(
-            datasource, static_path, ts_dim, columns, metric_spec,
-            intervals, num_shards, target_partition_size, query_granularity, segment_granularity,
-            hadoop_dependency_coordinates)
-        status_url = self.get_ingest_status_url(task_id)
-        while True:
-            r = requests.get(status_url)
-            d = json.loads(r.text)
-            if d['status']['status'] == 'FAILED':
-                logging.error(d)
-                raise AirflowDruidLoadException(
-                    "[Error]: Ingesting data to druid failed.")
-            elif d['status']['status'] == 'SUCCESS':
-                break
-            time.sleep(LOAD_CHECK_INTERVAL)
+    def submit_indexing_job(self, json_index_spec):
+        url = self.get_conn_url()
+
+        req_index = requests.post(url, data=json_index_spec, headers=self.header)
+        if (req_index.status_code != 200):
+            raise AirflowException("Did not get 200 when submitting the Druid job to {}".format(url))
+
+        req_json = req_index.json()
+        # Wait until the job is completed
+        druid_task_id = req_json['task']
+
+        running = True
+
+        sec = 0
+        while running:
+            req_status = requests.get("{0}/{1}/status".format(url, druid_task_id))
+
+            logging.info("Job still running for {0} seconds...".format(sec))
+
+            sec = sec + 1
+
+            if sec > self.max_ingestion_time:
+                raise AirflowException('Druid ingestion took more than {} seconds'.format(self.max_ingestion_time))
+
+            time.sleep(self.timeout)
+
+            status = req_status.json()['status']['status']
+            if status == 'RUNNING':
+                running = True
+            elif status == 'SUCCESS':
+                running = False  # Great success!
+            elif status == 'FAILED':
+                raise AirflowException('Druid indexing job failed, check console for more
info')
+            else:
+                raise AirflowException('Could not get status of the job, got {0}'.format(status))
+
+        logging.info('Successful index')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/de99aa20/airflow/operators/hive_to_druid.py
----------------------------------------------------------------------
diff --git a/airflow/operators/hive_to_druid.py b/airflow/operators/hive_to_druid.py
index 17b8188..70d7825 100644
--- a/airflow/operators/hive_to_druid.py
+++ b/airflow/operators/hive_to_druid.py
@@ -19,6 +19,9 @@ from airflow.hooks.druid_hook import DruidHook
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 
+LOAD_CHECK_INTERVAL = 5
+DEFAULT_TARGET_PARTITION_SIZE = 5000000
+
 
 class HiveToDruidTransfer(BaseOperator):
     """
@@ -50,7 +53,6 @@ class HiveToDruidTransfer(BaseOperator):
 
     template_fields = ('sql', 'intervals')
     template_ext = ('.sql',)
-    #ui_color = '#a0e08c'
 
     @apply_defaults
     def __init__(
@@ -66,8 +68,8 @@ class HiveToDruidTransfer(BaseOperator):
             intervals=None,
             num_shards=-1,
             target_partition_size=-1,
-            query_granularity=None,
-            segment_granularity=None,
+            query_granularity="NONE",
+            segment_granularity="DAY",
             *args, **kwargs):
         super(HiveToDruidTransfer, self).__init__(*args, **kwargs)
         self.sql = sql
@@ -92,8 +94,8 @@ class HiveToDruidTransfer(BaseOperator):
         hive_table = 'druid.' + context['task_instance_key_str'].replace('.', '_')
         sql = self.sql.strip().strip(';')
         hql = """\
-        set mapred.output.compress=false;
-        set hive.exec.compress.output=false;
+        SET mapred.output.compress=false;
+        SET hive.exec.compress.output=false;
         DROP TABLE IF EXISTS {hive_table};
         CREATE TABLE {hive_table}
         ROW FORMAT DELIMITED FIELDS TERMINATED BY  '\t'
@@ -106,10 +108,12 @@ class HiveToDruidTransfer(BaseOperator):
         hive.run_cli(hql)
 
         m = HiveMetastoreHook(self.metastore_conn_id)
-        t = m.get_table(hive_table)
 
+        # Get the Hive table and extract the columns
+        t = m.get_table(hive_table)
         columns = [col.name for col in t.sd.cols]
 
+        # Get the path on hdfs
         hdfs_uri = m.get_table(hive_table).sd.location
         pos = hdfs_uri.find('/user')
         static_path = hdfs_uri[pos:]
@@ -117,17 +121,17 @@ class HiveToDruidTransfer(BaseOperator):
         schema, table = hive_table.split('.')
 
         druid = DruidHook(druid_ingest_conn_id=self.druid_ingest_conn_id)
-        logging.info("Inserting rows into Druid")
-        logging.info("HDFS path: " + static_path)
 
         try:
-            druid.load_from_hdfs(
-                datasource=self.druid_datasource,
-                intervals=self.intervals,
-                static_path=static_path, ts_dim=self.ts_dim,
-                columns=columns, num_shards=self.num_shards, target_partition_size=self.target_partition_size,
-                query_granularity=self.query_granularity, segment_granularity=self.segment_granularity,
-                metric_spec=self.metric_spec, hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
+            index_spec = self.construct_ingest_query(
+                static_path=static_path,
+                columns=columns,
+            )
+
+            logging.info("Inserting rows into Druid, hdfs path: {}".format(static_path))
+
+            druid.submit_indexing_job(index_spec)
+
             logging.info("Load seems to have succeeded!")
         finally:
             logging.info(
@@ -135,3 +139,85 @@ class HiveToDruidTransfer(BaseOperator):
                 "Hive table {}".format(hive_table))
             hql = "DROP TABLE IF EXISTS {}".format(hive_table)
             hive.run_cli(hql)
+
+    def construct_ingest_query(self, static_path, columns):
+        """
+        Builds an ingest query for an HDFS TSV load.
+
+        :param static_path: The path on hdfs where the data is
+        :type static_path: str
+        :param columns: List of all the columns that are available
+        :type columns: list
+        """
+
+        # backward compatibilty for num_shards, but target_partition_size is the default
setting
+        # and overwrites the num_shards
+        num_shards = self.num_shards
+        target_partition_size = self.target_partition_size
+        if self.target_partition_size == -1:
+            if self.num_shards == -1:
+                target_partition_size = DEFAULT_TARGET_PARTITION_SIZE
+        else:
+            num_shards = -1
+
+        metric_names = [m['fieldName'] for m in self.metric_spec if m['type'] != 'count']
+
+        # Take all the columns, which are not the time dimension or a metric, as the dimension
columns
+        dimensions = [c for c in columns if c not in metric_names and c != self.ts_dim]
+
+        ingest_query_dict = {
+            "type": "index_hadoop",
+            "spec": {
+                "dataSchema": {
+                    "metricsSpec": self.metric_spec,
+                    "granularitySpec": {
+                        "queryGranularity": self.query_granularity,
+                        "intervals": self.intervals,
+                        "type": "uniform",
+                        "segmentGranularity": self.segment_granularity,
+                    },
+                    "parser": {
+                        "type": "string",
+                        "parseSpec": {
+                            "columns": columns,
+                            "dimensionsSpec": {
+                                "dimensionExclusions": [],
+                                "dimensions": dimensions,  # list of names
+                                "spatialDimensions": []
+                            },
+                            "timestampSpec": {
+                                "column": self.ts_dim,
+                                "format": "auto"
+                            },
+                            "format": "tsv"
+                        }
+                    },
+                    "dataSource": self.druid_datasource
+                },
+                "tuningConfig": {
+                    "type": "hadoop",
+                    "jobProperties": {
+                        "mapreduce.job.user.classpath.first": "false",
+                        "mapreduce.map.output.compress": "false",
+                        "mapreduce.output.fileoutputformat.compress": "false",
+                    },
+                    "partitionsSpec": {
+                        "type": "hashed",
+                        "targetPartitionSize": target_partition_size,
+                        "numShards": num_shards,
+                    },
+                },
+                "ioConfig": {
+                    "inputSpec": {
+                        "paths": static_path,
+                        "type": "static"
+                    },
+                    "type": "hadoop"
+                }
+            }
+        }
+
+        if self.hadoop_dependency_coordinates:
+            ingest_query_dict['hadoopDependencyCoordinates'] = self.hadoop_dependency_coordinates
+
+        return ingest_query_dict

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/de99aa20/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index 04b1512..35c187c 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -199,6 +199,10 @@ def initdb():
             host='yarn', extra='{"queue": "root.default"}'))
     merge_conn(
         models.Connection(
+            conn_id='druid_ingest_default', conn_type='druid',
+            host='druid-overlord', port=8081, extra='{"endpoint": "druid/indexer/v1/task"}'))
+    merge_conn(
+        models.Connection(
             conn_id='redis_default', conn_type='redis',
             host='localhost', port=6379,
             extra='{"db": 0}'))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/de99aa20/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 670335c..d612d6f 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -66,7 +66,6 @@ parameterized
 paramiko>=2.1.1
 psutil>=4.2.0, <5.0.0
 psycopg2
-pydruid
 pygments
 pyhive
 pykerberos
@@ -79,6 +78,7 @@ redis
 rednose
 requests
 requests-kerberos
+requests_mock
 setproctitle
 slackclient
 sphinx

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/de99aa20/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index dedcf76..12e00ec 100644
--- a/setup.py
+++ b/setup.py
@@ -135,7 +135,6 @@ doc = [
     'Sphinx-PyPI-upload>=0.2.1'
 ]
 docker = ['docker-py>=1.6.0']
-druid = ['pydruid>=0.2.1']
 emr = ['boto3>=1.0.0']
 gcp_api = [
     'httplib2',
@@ -198,7 +197,8 @@ devel = [
     'nose-timer',
     'parameterized',
     'rednose',
-    'paramiko'
+    'paramiko',
+    'requests_mock'
 ]
 devel_minreq = devel + mysql + doc + password + s3 + cgroups
 devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
@@ -266,7 +266,6 @@ def do_setup():
             'devel_hadoop': devel_hadoop,
             'doc': doc,
             'docker': docker,
-            'druid': druid,
             'emr': emr,
             'gcp_api': gcp_api,
             'github_enterprise': github_enterprise,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/de99aa20/tests/contrib/operators/test_druid_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_druid_operator.py b/tests/contrib/operators/test_druid_operator.py
new file mode 100644
index 0000000..aa5cb61
--- /dev/null
+++ b/tests/contrib/operators/test_druid_operator.py
@@ -0,0 +1,47 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import datetime
+import mock
+import unittest
+
+from airflow import DAG, configuration
+from airflow.contrib.operators.druid_operator import DruidOperator
+
+
+class TestDruidOperator(unittest.TestCase):
+    def setUp(self):
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': datetime.datetime(2017, 1, 1)
+        }
+        self.dag = DAG('test_dag_id', default_args=args)
+
+    def test_read_spec_from_file(self):
+        m = mock.mock_open(read_data='{"some": "json"}')
+        with mock.patch('airflow.contrib.operators.druid_operator.open', m, create=True)
as m:
+            druid = DruidOperator(
+                task_id='druid_indexing_job',
+                json_index_file='index_spec.json',
+                dag=self.dag
+            )
+
+            m.assert_called_once_with('index_spec.json')
+            self.assertEqual(druid.index_spec, {'some': 'json'})
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/de99aa20/tests/hooks/test_druid_hook.py
----------------------------------------------------------------------
diff --git a/tests/hooks/test_druid_hook.py b/tests/hooks/test_druid_hook.py
new file mode 100644
index 0000000..c049cb2
--- /dev/null
+++ b/tests/hooks/test_druid_hook.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import requests
+import requests_mock
+import unittest
+
+from airflow.exceptions import AirflowException
+from airflow.hooks.druid_hook import DruidHook
+
+
+class TestDruidHook(unittest.TestCase):
+
+    def setUp(self):
+        super(TestDruidHook, self).setUp()
+
+        session = requests.Session()
+        adapter = requests_mock.Adapter()
+        session.mount('mock', adapter)
+
+    @requests_mock.mock()
+    def test_submit_gone_wrong(self, m):
+        hook = DruidHook()
+        m.post(
+            'http://druid-overlord:8081/druid/indexer/v1/task',
+            text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
+        )
+        m.get(
+            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+            text='{"status":{"status": "FAILED"}}'
+        )
+
+        # The job failed for some reason
+        with self.assertRaises(AirflowException):
+            hook.submit_indexing_job('Long json file')
+
+    @requests_mock.mock()
+    def test_submit_ok(self, m):
+        hook = DruidHook()
+        m.post(
+            'http://druid-overlord:8081/druid/indexer/v1/task',
+            text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
+        )
+        m.get(
+            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+            text='{"status":{"status": "SUCCESS"}}'
+        )
+
+        # Exists just as it should
+        hook.submit_indexing_job('Long json file')
+
+    @requests_mock.mock()
+    def test_submit_unknown_response(self, m):
+        hook = DruidHook()
+        m.post(
+            'http://druid-overlord:8081/druid/indexer/v1/task',
+            text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
+        )
+        m.get(
+            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+            text='{"status":{"status": "UNKNOWN"}}'
+        )
+
+        # An unknown error code
+        with self.assertRaises(AirflowException):
+            hook.submit_indexing_job('Long json file')
+
+    @requests_mock.mock()
+    def test_submit_timeout(self, m):
+        hook = DruidHook(timeout=0, max_ingestion_time=5)
+        m.post(
+            'http://druid-overlord:8081/druid/indexer/v1/task',
+            text='{"task":"9f8a7359-77d4-4612-b0cd-cc2f6a3c28de"}'
+        )
+        m.get(
+            'http://druid-overlord:8081/druid/indexer/v1/task/9f8a7359-77d4-4612-b0cd-cc2f6a3c28de/status',
+            text='{"status":{"status": "RUNNING"}}'
+        )
+
+        # Because the jobs keeps running
+        with self.assertRaises(AirflowException):
+            hook.submit_indexing_job('Long json file')
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/de99aa20/tests/operators/test_hive_to_druid.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_hive_to_druid.py b/tests/operators/test_hive_to_druid.py
new file mode 100644
index 0000000..fb8ce4e
--- /dev/null
+++ b/tests/operators/test_hive_to_druid.py
@@ -0,0 +1,132 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import requests
+import requests_mock
+import unittest
+
+from airflow import DAG, configuration
+from airflow.operators.hive_to_druid import HiveToDruidTransfer
+
+
+class TestDruidHook(unittest.TestCase):
+
+    # To debug the large json diff
+    maxDiff = None
+
+    hook_config = {
+        'sql': 'SELECT * FROM table',
+        'druid_datasource': 'our_datasource',
+        'ts_dim': 'timedimension_column',
+        'metric_spec': [
+            {"name": "count", "type": "count"},
+            {"name": "amountSum", "type": "doubleSum", "fieldName": "amount"}
+        ],
+        'hive_cli_conn_id': 'hive_cli_custom',
+        'druid_ingest_conn_id': 'druid_ingest_default',
+        'metastore_conn_id': 'metastore_default',
+        'hadoop_dependency_coordinates': 'org.apache.spark:spark-core_2.10:1.5.2-mmx1',
+        'intervals': '2016-01-01/2017-01-01',
+        'num_shards': -1,
+        'target_partition_size': 1925,
+        'query_granularity': 'month',
+        'segment_granularity': 'week'
+    }
+
+    index_spec_config = {
+        'static_path': '/apps/db/warehouse/hive/',
+        'columns': ['country', 'segment']
+    }
+
+    def setUp(self):
+        super(TestDruidHook, self).setUp()
+
+        configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': '2017-01-01'
+        }
+        self.dag = DAG('hive_to_druid', default_args=args)
+
+        session = requests.Session()
+        adapter = requests_mock.Adapter()
+        session.mount('mock', adapter)
+
+    def test_construct_ingest_query(self):
+        operator = HiveToDruidTransfer(
+            task_id='hive_to_druid',
+            dag=self.dag,
+            **self.hook_config
+        )
+
+        provided_index_spec = operator.construct_ingest_query(
+            **self.index_spec_config
+        )
+
+        expected_index_spec = {
+            "hadoopDependencyCoordinates": self.hook_config['hadoop_dependency_coordinates'],
+            "type": "index_hadoop",
+            "spec": {
+                "dataSchema": {
+                    "metricsSpec": self.hook_config['metric_spec'],
+                    "granularitySpec": {
+                        "queryGranularity": self.hook_config['query_granularity'],
+                        "intervals": self.hook_config['intervals'],
+                        "type": "uniform",
+                        "segmentGranularity": self.hook_config['segment_granularity'],
+                    },
+                    "parser": {
+                        "type": "string",
+                        "parseSpec": {
+                            "columns": self.index_spec_config['columns'],
+                            "dimensionsSpec": {
+                                "dimensionExclusions": [],
+                                "dimensions": self.index_spec_config['columns'],
+                                "spatialDimensions": []
+                            },
+                            "timestampSpec": {
+                                "column": self.hook_config['ts_dim'],
+                                "format": "auto"
+                            },
+                            "format": "tsv"
+                        }
+                    },
+                    "dataSource": self.hook_config['druid_datasource']
+                },
+                "tuningConfig": {
+                    "type": "hadoop",
+                    "jobProperties": {
+                        "mapreduce.job.user.classpath.first": "false",
+                        "mapreduce.map.output.compress": "false",
+                        "mapreduce.output.fileoutputformat.compress": "false",
+                    },
+                    "partitionsSpec": {
+                        "type": "hashed",
+                        "targetPartitionSize": self.hook_config['target_partition_size'],
+                        "numShards": self.hook_config['num_shards'],
+                    },
+                },
+                "ioConfig": {
+                    "inputSpec": {
+                        "paths": self.index_spec_config['static_path'],
+                        "type": "static"
+                    },
+                    "type": "hadoop"
+                }
+            }
+        }
+
+        # Make sure it is like we expect it
+        self.assertEqual(provided_index_spec, expected_index_spec)


Mime
View raw message