airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bolke de Bruin (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (AIRFLOW-2146) Initialize default Google BigQuery Connection with valid conn_type & Fix broken DBApiHook
Date Sun, 25 Feb 2018 11:17:00 GMT

     [ https://issues.apache.org/jira/browse/AIRFLOW-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Bolke de Bruin resolved AIRFLOW-2146.
-------------------------------------
       Resolution: Fixed
    Fix Version/s: 1.10.0

Issue resolved by pull request #3073
[https://github.com/apache/incubator-airflow/pull/3073]

> Initialize default Google BigQuery Connection with valid conn_type & Fix broken DBApiHook
> -----------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2146
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2146
>             Project: Apache Airflow
>          Issue Type: Task
>          Components: contrib, gcp
>            Reporter: Kaxil Naik
>            Assignee: Kaxil Naik
>            Priority: Major
>             Fix For: 1.10.0
>
>
> `airflow initdb` creates a connection with conn_id='bigquery_default' and conn_type='bigquery'.
However, bigquery is not a valid conn_type, according to models.Connection._types, and BigQuery
connections should use the google_cloud_platform conn_type.
> Also as [renanleme|https://github.com/renanleme] mentioned [here|https://github.com/apache/incubator-airflow/pull/3031#issuecomment-368132910]
the dags he has created are broken when he is using `get_records()` from BigQueryHook which
is extended from DbApiHook.
> *Error Log*:
> {code}
> Traceback (most recent call last):
>   File "/src/apache-airflow/airflow/models.py", line 1519, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File "/airflow/dags/lib/operators/test_operator.py", line 21, in execute
>     records = self._get_db_hook(self.source_conn_id).get_records(self.sql)
>   File "/src/apache-airflow/airflow/hooks/base_hook.py", line 92, in get_records
>     raise NotImplementedError()
> {code}
> *Dag*:
> {code:python}
> from datetime import datetime
> from airflow import DAG
> from lib.operators.test_operator import TestOperator
> default_args = {
>     'depends_on_past': False,
>     'start_date': datetime(2018, 2, 21),
> }
> dag = DAG(
>     'test_dag',
>     default_args=default_args,
>     schedule_interval='0 6 * * *'
> )
> sql = '''
>     SELECT id from YOUR_BIGQUERY_TABLE limit 10
> '''
> compare_grouped_event = TestOperator(
>     task_id='test_operator',
>     source_conn_id='gcp_airflow',
>     sql=sql,
>     dag=dag
> )
> {code}
> *Operator*:
> {code:python}
> from airflow.hooks.base_hook import BaseHook
> from airflow.models import BaseOperator
> from airflow.utils.decorators import apply_defaults
> class TestOperator(BaseOperator):
>     @apply_defaults
>     def __init__(
>             self,
>             sql,
>             source_conn_id=None,
>             *args, **kwargs):
>         super(TestOperator, self).__init__(*args, **kwargs)
>         self.sql = sql
>         self.source_conn_id = source_conn_id
>     def execute(self, context=None):
>         records = self._get_db_hook(self.source_conn_id).get_records(self.sql)
>         self.log.info('Fetched records from source')
>     @staticmethod
>     def _get_db_hook(conn_id):
>         return BaseHook.get_hook(conn_id=conn_id)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message