Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0FCD2200AC8 for ; Tue, 24 May 2016 05:29:59 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 0E914160A2B; Tue, 24 May 2016 03:29:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BACF7160A0E for ; Tue, 24 May 2016 05:29:57 +0200 (CEST) Received: (qmail 97459 invoked by uid 500); 24 May 2016 03:29:57 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 97450 invoked by uid 99); 24 May 2016 03:29:56 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 May 2016 03:29:56 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 86948C17B1 for ; Tue, 24 May 2016 03:29:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -2.696 X-Spam-Level: X-Spam-Status: No, score=-2.696 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=2, KAM_INFOUSMEBIZ=0.75, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id kq0i2D5kcRlw for ; Tue, 24 May 2016 03:29:54 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id EB4165FACE for ; Tue, 24 May 2016 03:29:53 +0000 (UTC) Received: (qmail 97438 invoked by uid 99); 24 May 2016 03:29:53 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 May 2016 03:29:53 +0000 Received: from mail-it0-f52.google.com (mail-it0-f52.google.com [209.85.214.52]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 1C16A1A0283; Tue, 24 May 2016 03:29:53 +0000 (UTC) Received: by mail-it0-f52.google.com with SMTP id z189so38085656itg.0; Mon, 23 May 2016 20:29:53 -0700 (PDT) X-Gm-Message-State: AOPr4FUT31bt12X7eVQmk+xwzcDY7ta1vEU8gKswLnOq5OcSOYberzpjDyAVpWqr6XKleDuZmu9PkTZvv2D0gw== MIME-Version: 1.0 X-Received: by 10.36.207.137 with SMTP id y131mr16113923itf.32.1464060592627; Mon, 23 May 2016 20:29:52 -0700 (PDT) Received: by 10.64.227.4 with HTTP; Mon, 23 May 2016 20:29:52 -0700 (PDT) In-Reply-To: References: Date: Mon, 23 May 2016 20:29:52 -0700 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: [1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec From: Chris Riccomini To: Chris Riccomini Cc: dev@airflow.incubator.apache.org, commits@airflow.incubator.apache.org Content-Type: multipart/alternative; boundary=94eb2c057d988da43205338e2b5d archived-at: Tue, 24 May 2016 03:29:59 -0000 --94eb2c057d988da43205338e2b5d Content-Type: text/plain; charset=UTF-8 s/two/too .. sigh On Mon, May 23, 2016 at 8:29 PM, Chris Riccomini wrote: > Ah, yea. I get bitten by that two. It's annoying to have to ask people to > add a JIRA to their commit message. And we can't squash through GitHub > anymore. :( Wonder if the airflow-pr script allows us to edit it? I think > it might.... > > On Mon, May 23, 2016 at 5:50 PM, Dan Davydov < > dan.davydov@airbnb.com.invalid> wrote: > >> Yep sorry will check the versions in the future. My own commits have JIRA >> labels but I haven't validated that other users have done this for theirs >> when I merge their commits (as the LGTM is delegated to either another >> committer or the owner of a particular operator). Will be more vigilant in >> the future. >> >> On Mon, May 23, 2016 at 5:07 PM, Chris Riccomini >> wrote: >> >> > Hey Dan, >> > >> > Could you please file JIRAs, and put the JIRA name as the prefix to your >> > commits? >> > >> > Cheers, >> > Chris >> > >> > On Mon, May 23, 2016 at 5:01 PM, wrote: >> > >> >> Repository: incubator-airflow >> >> Updated Branches: >> >> refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae >> >> >> >> >> >> use targetPartitionSize as the default partition spec >> >> >> >> >> >> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo >> >> Commit: >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09 >> >> Tree: >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09 >> >> Diff: >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09 >> >> >> >> Branch: refs/heads/airbnb_rb1.7.1_4 >> >> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda >> >> Parents: 1d0d868 >> >> Author: Hongbo Zeng >> >> Authored: Sat May 14 17:00:42 2016 -0700 >> >> Committer: Dan Davydov >> >> Committed: Mon May 23 16:59:52 2016 -0700 >> >> >> >> ---------------------------------------------------------------------- >> >> airflow/hooks/druid_hook.py | 23 ++++++++++++++++------- >> >> airflow/operators/hive_to_druid.py | 8 +++++--- >> >> 2 files changed, 21 insertions(+), 10 deletions(-) >> >> ---------------------------------------------------------------------- >> >> >> >> >> >> >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py >> >> ---------------------------------------------------------------------- >> >> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py >> >> index b6cb231..7c80c7c 100644 >> >> --- a/airflow/hooks/druid_hook.py >> >> +++ b/airflow/hooks/druid_hook.py >> >> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook >> >> from airflow.exceptions import AirflowException >> >> >> >> LOAD_CHECK_INTERVAL = 5 >> >> - >> >> +TARGET_PARTITION_SIZE = 5000000 >> >> >> >> class AirflowDruidLoadException(AirflowException): >> >> pass >> >> @@ -52,13 +52,22 @@ class DruidHook(BaseHook): >> >> >> >> def construct_ingest_query( >> >> self, datasource, static_path, ts_dim, columns, >> metric_spec, >> >> - intervals, num_shards, >> hadoop_dependency_coordinates=None): >> >> + intervals, num_shards, target_partition_size, >> >> hadoop_dependency_coordinates=None): >> >> """ >> >> Builds an ingest query for an HDFS TSV load. >> >> >> >> :param datasource: target datasource in druid >> >> :param columns: list of all columns in the TSV, in the right >> >> order >> >> """ >> >> + >> >> + # backward compatibilty for num_shards, but >> >> target_partition_size is the default setting >> >> + # and overwrites the num_shards >> >> + if target_partition_size == -1: >> >> + if num_shards == -1: >> >> + target_partition_size = TARGET_PARTITION_SIZE >> >> + else: >> >> + num_shards = -1 >> >> + >> >> metric_names = [ >> >> m['fieldName'] for m in metric_spec if m['type'] != >> 'count'] >> >> dimensions = [c for c in columns if c not in metric_names and >> c >> >> != ts_dim] >> >> @@ -100,7 +109,7 @@ class DruidHook(BaseHook): >> >> }, >> >> "partitionsSpec" : { >> >> "type" : "hashed", >> >> - "targetPartitionSize" : -1, >> >> + "targetPartitionSize" : target_partition_size, >> >> "numShards" : num_shards, >> >> }, >> >> }, >> >> @@ -121,10 +130,10 @@ class DruidHook(BaseHook): >> >> >> >> def send_ingest_query( >> >> self, datasource, static_path, ts_dim, columns, >> metric_spec, >> >> - intervals, num_shards, >> hadoop_dependency_coordinates=None): >> >> + intervals, num_shards, target_partition_size, >> >> hadoop_dependency_coordinates=None): >> >> query = self.construct_ingest_query( >> >> datasource, static_path, ts_dim, columns, >> >> - metric_spec, intervals, num_shards, >> >> hadoop_dependency_coordinates) >> >> + metric_spec, intervals, num_shards, target_partition_size, >> >> hadoop_dependency_coordinates) >> >> r = requests.post( >> >> self.ingest_post_url, headers=self.header, data=query) >> >> logging.info(self.ingest_post_url) >> >> @@ -138,7 +147,7 @@ class DruidHook(BaseHook): >> >> >> >> def load_from_hdfs( >> >> self, datasource, static_path, ts_dim, columns, >> >> - intervals, num_shards, metric_spec=None, >> >> hadoop_dependency_coordinates=None): >> >> + intervals, num_shards, target_partition_size, >> >> metric_spec=None, hadoop_dependency_coordinates=None): >> >> """ >> >> load data to druid from hdfs >> >> :params ts_dim: The column name to use as a timestamp >> >> @@ -146,7 +155,7 @@ class DruidHook(BaseHook): >> >> """ >> >> task_id = self.send_ingest_query( >> >> datasource, static_path, ts_dim, columns, metric_spec, >> >> - intervals, num_shards, hadoop_dependency_coordinates) >> >> + intervals, num_shards, target_partition_size, >> >> hadoop_dependency_coordinates) >> >> status_url = self.get_ingest_status_url(task_id) >> >> while True: >> >> r = requests.get(status_url) >> >> >> >> >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/operators/hive_to_druid.py >> >> ---------------------------------------------------------------------- >> >> diff --git a/airflow/operators/hive_to_druid.py >> >> b/airflow/operators/hive_to_druid.py >> >> index 1346841..420aeed 100644 >> >> --- a/airflow/operators/hive_to_druid.py >> >> +++ b/airflow/operators/hive_to_druid.py >> >> @@ -49,7 +49,8 @@ class HiveToDruidTransfer(BaseOperator): >> >> metastore_conn_id='metastore_default', >> >> hadoop_dependency_coordinates=None, >> >> intervals=None, >> >> - num_shards=1, >> >> + num_shards=-1, >> >> + target_partition_size=-1, >> >> *args, **kwargs): >> >> super(HiveToDruidTransfer, self).__init__(*args, **kwargs) >> >> self.sql = sql >> >> @@ -57,6 +58,7 @@ class HiveToDruidTransfer(BaseOperator): >> >> self.ts_dim = ts_dim >> >> self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}'] >> >> self.num_shards = num_shards >> >> + self.target_partition_size = target_partition_size >> >> self.metric_spec = metric_spec or [{ >> >> "name": "count", >> >> "type": "count"}] >> >> @@ -103,8 +105,8 @@ class HiveToDruidTransfer(BaseOperator): >> >> datasource=self.druid_datasource, >> >> intervals=self.intervals, >> >> static_path=static_path, ts_dim=self.ts_dim, >> >> - columns=columns, num_shards=self.num_shards, >> >> metric_spec=self.metric_spec, >> >> - >> >> hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) >> >> + columns=columns, num_shards=self.num_shards, >> >> target_partition_size=self.target_partition_size, >> >> + metric_spec=self.metric_spec, >> >> hadoop_dependency_coordinates=self.hadoop_dependency_coordinates) >> >> logging.info("Load seems to have succeeded!") >> >> >> >> logging.info( >> >> >> >> >> > >> > > --94eb2c057d988da43205338e2b5d Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
s/two/too .. sigh

On Mon, May 23, 2016 at 8:29 PM, Chris Riccomini <criccomini@apache.org> wrote:
Ah, yea. I get bitten by that two. It's annoying= to have to ask people to add a JIRA to their commit message. And we can= 9;t squash through GitHub anymore. :( Wonder if the airflow-pr script allow= s us to edit it? I think it might....

On Mon, M= ay 23, 2016 at 5:50 PM, Dan Davydov <dan.davydov@airbnb.com.i= nvalid> wrote:
Yep sorry wi= ll check the versions in the future. My own commits have JIRA
labels but I haven't validated that other users have done this for thei= rs
when I merge their commits (as the LGTM is delegated to either another
committer or the owner of a particular operator). Will be more vigilant in<= br> the future.

On Mon, May 23, 2016 at 5:07 PM, Chris Riccomini <criccomini@apache.org>
wrote:

> Hey Dan,
>
> Could you please file JIRAs, and put the JIRA name as the prefix to yo= ur
> commits?
>
> Cheers,
> Chris
>
> On Mon, May 23, 2016 at 5:01 PM, <davydov@apache.org> wrote:
>
>> Repository: incubator-airflow
>> Updated Branches:
>>=C2=A0 =C2=A0refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae<= br> >>
>>
>> use targetPartitionSize as the default partition spec
>>
>>
>> Project: http://git-wip-us.apa= che.org/repos/asf/incubator-airflow/repo
>> Commit:
>> http://git-wip-us.a= pache.org/repos/asf/incubator-airflow/commit/b58b5e09
>> Tree:
>> http://git-wip-us.apa= che.org/repos/asf/incubator-airflow/tree/b58b5e09
>> Diff:
>> http://git-wip-us.apa= che.org/repos/asf/incubator-airflow/diff/b58b5e09
>>
>> Branch: refs/heads/airbnb_rb1.7.1_4
>> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda
>> Parents: 1d0d868
>> Author: Hongbo Zeng <hongbo.zeng@airbnb.com>
>> Authored: Sat May 14 17:00:42 2016 -0700
>> Committer: Dan Davydov <dan.davydov@airbnb.com>
>> Committed: Mon May 23 16:59:52 2016 -0700
>>
>> ------------------------------------------------------------------= ----
>>=C2=A0 airflow/hooks/druid_hook.py=C2=A0 =C2=A0 =C2=A0 =C2=A0 | 23 = ++++++++++++++++-------
>>=C2=A0 airflow/operators/hive_to_druid.py |=C2=A0 8 +++++---
>>=C2=A0 2 files changed, 21 insertions(+), 10 deletions(-)
>> ------------------------------------------------------------------= ----
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e= 09/airflow/hooks/druid_hook.py
>> ------------------------------------------------------------------= ----
>> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hoo= k.py
>> index b6cb231..7c80c7c 100644
>> --- a/airflow/hooks/druid_hook.py
>> +++ b/airflow/hooks/druid_hook.py
>> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook
>>=C2=A0 from airflow.exceptions import AirflowException
>>
>>=C2=A0 LOAD_CHECK_INTERVAL =3D 5
>> -
>> +TARGET_PARTITION_SIZE =3D 5000000
>>
>>=C2=A0 class AirflowDruidLoadException(AirflowException):
>>=C2=A0 =C2=A0 =C2=A0 pass
>> @@ -52,13 +52,22 @@ class DruidHook(BaseHook):
>>
>>=C2=A0 =C2=A0 =C2=A0 def construct_ingest_query(
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 self, datasource, = static_path, ts_dim, columns, metric_spec,
>> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 intervals, num_shards, = hadoop_dependency_coordinates=3DNone):
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 intervals, num_shards, = target_partition_size,
>> hadoop_dependency_coordinates=3DNone):
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 """
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 Builds an ingest query for an HD= FS TSV load.
>>
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 :param datasource: target dataso= urce in druid
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 :param columns: list of all colu= mns in the TSV, in the right
>> order
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 """
>> +
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 # backward compatibilty for num_shard= s, but
>> target_partition_size is the default setting
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 # and overwrites the num_shards
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 if target_partition_size =3D=3D -1: >> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if num_shards =3D=3D -1= :
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 target_pa= rtition_size =3D TARGET_PARTITION_SIZE
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 else:
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 num_shards =3D -1
>> +
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 metric_names =3D [
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 m['fieldName&#= 39;] for m in metric_spec if m['type'] !=3D 'count']
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 dimensions =3D [c for c in colum= ns if c not in metric_names and c
>> !=3D ts_dim]
>> @@ -100,7 +109,7 @@ class DruidHook(BaseHook):
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 },
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 "partitionsSpec" : {
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 "type" : "hashed",
>> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 "targetPartitionSize" : -1,
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 "targetPartitionSize" : target_partition_siz= e,
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 "numShards" : num_shards,
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 },
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 }, >> @@ -121,10 +130,10 @@ class DruidHook(BaseHook):
>>
>>=C2=A0 =C2=A0 =C2=A0 def send_ingest_query(
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 self, datasource, = static_path, ts_dim, columns, metric_spec,
>> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 intervals, num_shards, = hadoop_dependency_coordinates=3DNone):
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 intervals, num_shards, = target_partition_size,
>> hadoop_dependency_coordinates=3DNone):
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 query =3D self.construct_ingest_= query(
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 datasource, static= _path, ts_dim, columns,
>> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 metric_spec, intervals,= num_shards,
>> hadoop_dependency_coordinates)
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 metric_spec, intervals,= num_shards, target_partition_size,
>> hadoop_dependency_coordinates)
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 r =3D requests.post(
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 self.ingest_post_u= rl, headers=3Dself.header, data=3Dquery)
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 logging.info(self.ingest_post_url)=
>> @@ -138,7 +147,7 @@ class DruidHook(BaseHook):
>>
>>=C2=A0 =C2=A0 =C2=A0 def load_from_hdfs(
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 self, datasource, = static_path,=C2=A0 ts_dim, columns,
>> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 intervals, num_shards, = metric_spec=3DNone,
>> hadoop_dependency_coordinates=3DNone):
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 intervals, num_shards, = target_partition_size,
>> metric_spec=3DNone, hadoop_dependency_coordinates=3DNone):
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 """
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 load data to druid from hdfs
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 :params ts_dim: The column name = to use as a timestamp
>> @@ -146,7 +155,7 @@ class DruidHook(BaseHook):
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 """
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 task_id =3D self.send_ingest_que= ry(
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 datasource, static= _path, ts_dim, columns, metric_spec,
>> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 intervals, num_shards, = hadoop_dependency_coordinates)
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 intervals, num_shards, = target_partition_size,
>> hadoop_dependency_coordinates)
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 status_url =3D self.get_ingest_s= tatus_url(task_id)
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 while True:
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 r =3D requests.get= (status_url)
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob= /b58b5e09/airflow/operators/hive_to_druid.py
>> ------------------------------------------------------------------= ----
>> diff --git a/airflow/operators/hive_to_druid.py
>> b/airflow/operators/hive_to_druid.py
>> index 1346841..420aeed 100644
>> --- a/airflow/operators/hive_to_druid.py
>> +++ b/airflow/operators/hive_to_druid.py
>> @@ -49,7 +49,8 @@ class HiveToDruidTransfer(BaseOperator):
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 metastore_conn_id= =3D'metastore_default',
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 hadoop_dependency_= coordinates=3DNone,
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 intervals=3DNone,<= br> >> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 num_shards=3D1,
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 num_shards=3D-1,
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 target_partition_size= =3D-1,
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 *args, **kwargs):<= br> >>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 super(HiveToDruidTransfer, self)= .__init__(*args, **kwargs)
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 self.sql =3D sql
>> @@ -57,6 +58,7 @@ class HiveToDruidTransfer(BaseOperator):
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 self.ts_dim =3D ts_dim
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 self.intervals =3D intervals or = ['{{ ds }}/{{ tomorrow_ds }}']
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 self.num_shards =3D num_shards >> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 self.target_partition_size =3D target= _partition_size
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 self.metric_spec =3D metric_spec= or [{
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 "name": = "count",
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 "type": = "count"}]
>> @@ -103,8 +105,8 @@ class HiveToDruidTransfer(BaseOperator):
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 datasource=3Dself.= druid_datasource,
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 intervals=3Dself.i= ntervals,
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 static_path=3Dstat= ic_path, ts_dim=3Dself.ts_dim,
>> -=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 columns=3Dcolumns, num_= shards=3Dself.num_shards,
>> metric_spec=3Dself.metric_spec,
>> -
>> hadoop_dependency_coordinates=3Dself.hadoop_dependency_coordinates= )
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 columns=3Dcolumns, num_= shards=3Dself.num_shards,
>> target_partition_size=3Dself.target_partition_size,
>> +=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 metric_spec=3Dself.metr= ic_spec,
>> hadoop_dependency_coordinates=3Dself.hadoop_dependency_coordinates= )
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 logging.info("Load seems to h= ave succeeded!")
>>
>>=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 logging.info(
>>
>>
>


--94eb2c057d988da43205338e2b5d--