airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF subversion and git services (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-2146) Initialize default Google BigQuery Connection with valid conn_type & Fix broken DBApiHook
Date Sun, 25 Feb 2018 11:17:00 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-2146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376038#comment-16376038
] 

ASF subversion and git services commented on AIRFLOW-2146:
----------------------------------------------------------

Commit f2d10ef7b465080584fe96590775b6a2696a38ef in incubator-airflow's branch refs/heads/master
from [~kaxilnaik]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=f2d10ef ]

[AIRFLOW-2146] Resolve issues with BQ using DbApiHook methods

- Resolves issues with using methods like
`get_records()` from `BigQueryHook` which is
extended from `DbApiHook`.
- Reverting one changed file from
https://github.com/apache/incubator-
airflow/pull/3031 to use BigQuery Hook again
instead of `GoogleCloudBaseHook`
- Fix `conn_type` of `bigquery_default` connection

Closes #3073 from kaxil/AIRFLOW-2146


> Initialize default Google BigQuery Connection with valid conn_type & Fix broken DBApiHook
> -----------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2146
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2146
>             Project: Apache Airflow
>          Issue Type: Task
>          Components: contrib, gcp
>            Reporter: Kaxil Naik
>            Assignee: Kaxil Naik
>            Priority: Major
>             Fix For: 1.10.0
>
>
> `airflow initdb` creates a connection with conn_id='bigquery_default' and conn_type='bigquery'.
However, bigquery is not a valid conn_type, according to models.Connection._types, and BigQuery
connections should use the google_cloud_platform conn_type.
> Also as [renanleme|https://github.com/renanleme] mentioned [here|https://github.com/apache/incubator-airflow/pull/3031#issuecomment-368132910]
the dags he has created are broken when he is using `get_records()` from BigQueryHook which
is extended from DbApiHook.
> *Error Log*:
> {code}
> Traceback (most recent call last):
>   File "/src/apache-airflow/airflow/models.py", line 1519, in _run_raw_task
>     result = task_copy.execute(context=context)
>   File "/airflow/dags/lib/operators/test_operator.py", line 21, in execute
>     records = self._get_db_hook(self.source_conn_id).get_records(self.sql)
>   File "/src/apache-airflow/airflow/hooks/base_hook.py", line 92, in get_records
>     raise NotImplementedError()
> {code}
> *Dag*:
> {code:python}
> from datetime import datetime
> from airflow import DAG
> from lib.operators.test_operator import TestOperator
> default_args = {
>     'depends_on_past': False,
>     'start_date': datetime(2018, 2, 21),
> }
> dag = DAG(
>     'test_dag',
>     default_args=default_args,
>     schedule_interval='0 6 * * *'
> )
> sql = '''
>     SELECT id from YOUR_BIGQUERY_TABLE limit 10
> '''
> compare_grouped_event = TestOperator(
>     task_id='test_operator',
>     source_conn_id='gcp_airflow',
>     sql=sql,
>     dag=dag
> )
> {code}
> *Operator*:
> {code:python}
> from airflow.hooks.base_hook import BaseHook
> from airflow.models import BaseOperator
> from airflow.utils.decorators import apply_defaults
> class TestOperator(BaseOperator):
>     @apply_defaults
>     def __init__(
>             self,
>             sql,
>             source_conn_id=None,
>             *args, **kwargs):
>         super(TestOperator, self).__init__(*args, **kwargs)
>         self.sql = sql
>         self.source_conn_id = source_conn_id
>     def execute(self, context=None):
>         records = self._get_db_hook(self.source_conn_id).get_records(self.sql)
>         self.log.info('Fetched records from source')
>     @staticmethod
>     def _get_db_hook(conn_id):
>         return BaseHook.get_hook(conn_id=conn_id)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message