airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@apache.org>
Subject Re: [1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec
Date Tue, 24 May 2016 03:29:52 GMT
s/two/too .. sigh

On Mon, May 23, 2016 at 8:29 PM, Chris Riccomini <criccomini@apache.org>
wrote:

> Ah, yea. I get bitten by that two. It's annoying to have to ask people to
> add a JIRA to their commit message. And we can't squash through GitHub
> anymore. :( Wonder if the airflow-pr script allows us to edit it? I think
> it might....
>
> On Mon, May 23, 2016 at 5:50 PM, Dan Davydov <
> dan.davydov@airbnb.com.invalid> wrote:
>
>> Yep sorry will check the versions in the future. My own commits have JIRA
>> labels but I haven't validated that other users have done this for theirs
>> when I merge their commits (as the LGTM is delegated to either another
>> committer or the owner of a particular operator). Will be more vigilant in
>> the future.
>>
>> On Mon, May 23, 2016 at 5:07 PM, Chris Riccomini <criccomini@apache.org>
>> wrote:
>>
>> > Hey Dan,
>> >
>> > Could you please file JIRAs, and put the JIRA name as the prefix to your
>> > commits?
>> >
>> > Cheers,
>> > Chris
>> >
>> > On Mon, May 23, 2016 at 5:01 PM, <davydov@apache.org> wrote:
>> >
>> >> Repository: incubator-airflow
>> >> Updated Branches:
>> >>   refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae
>> >>
>> >>
>> >> use targetPartitionSize as the default partition spec
>> >>
>> >>
>> >> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
>> >> Commit:
>> >>
>> http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09
>> >> Tree:
>> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09
>> >> Diff:
>> >> http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09
>> >>
>> >> Branch: refs/heads/airbnb_rb1.7.1_4
>> >> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda
>> >> Parents: 1d0d868
>> >> Author: Hongbo Zeng <hongbo.zeng@airbnb.com>
>> >> Authored: Sat May 14 17:00:42 2016 -0700
>> >> Committer: Dan Davydov <dan.davydov@airbnb.com>
>> >> Committed: Mon May 23 16:59:52 2016 -0700
>> >>
>> >> ----------------------------------------------------------------------
>> >>  airflow/hooks/druid_hook.py        | 23 ++++++++++++++++-------
>> >>  airflow/operators/hive_to_druid.py |  8 +++++---
>> >>  2 files changed, 21 insertions(+), 10 deletions(-)
>> >> ----------------------------------------------------------------------
>> >>
>> >>
>> >>
>> >>
>> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py
>> >> ----------------------------------------------------------------------
>> >> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
>> >> index b6cb231..7c80c7c 100644
>> >> --- a/airflow/hooks/druid_hook.py
>> >> +++ b/airflow/hooks/druid_hook.py
>> >> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook
>> >>  from airflow.exceptions import AirflowException
>> >>
>> >>  LOAD_CHECK_INTERVAL = 5
>> >> -
>> >> +TARGET_PARTITION_SIZE = 5000000
>> >>
>> >>  class AirflowDruidLoadException(AirflowException):
>> >>      pass
>> >> @@ -52,13 +52,22 @@ class DruidHook(BaseHook):
>> >>
>> >>      def construct_ingest_query(
>> >>              self, datasource, static_path, ts_dim, columns,
>> metric_spec,
>> >> -            intervals, num_shards,
>> hadoop_dependency_coordinates=None):
>> >> +            intervals, num_shards, target_partition_size,
>> >> hadoop_dependency_coordinates=None):
>> >>          """
>> >>          Builds an ingest query for an HDFS TSV load.
>> >>
>> >>          :param datasource: target datasource in druid
>> >>          :param columns: list of all columns in the TSV, in the right
>> >> order
>> >>          """
>> >> +
>> >> +        # backward compatibilty for num_shards, but
>> >> target_partition_size is the default setting
>> >> +        # and overwrites the num_shards
>> >> +        if target_partition_size == -1:
>> >> +            if num_shards == -1:
>> >> +                target_partition_size = TARGET_PARTITION_SIZE
>> >> +        else:
>> >> +            num_shards = -1
>> >> +
>> >>          metric_names = [
>> >>              m['fieldName'] for m in metric_spec if m['type'] !=
>> 'count']
>> >>          dimensions = [c for c in columns if c not in metric_names and
>> c
>> >> != ts_dim]
>> >> @@ -100,7 +109,7 @@ class DruidHook(BaseHook):
>> >>                      },
>> >>                      "partitionsSpec" : {
>> >>                          "type" : "hashed",
>> >> -                        "targetPartitionSize" : -1,
>> >> +                        "targetPartitionSize" : target_partition_size,
>> >>                          "numShards" : num_shards,
>> >>                      },
>> >>                  },
>> >> @@ -121,10 +130,10 @@ class DruidHook(BaseHook):
>> >>
>> >>      def send_ingest_query(
>> >>              self, datasource, static_path, ts_dim, columns,
>> metric_spec,
>> >> -            intervals, num_shards,
>> hadoop_dependency_coordinates=None):
>> >> +            intervals, num_shards, target_partition_size,
>> >> hadoop_dependency_coordinates=None):
>> >>          query = self.construct_ingest_query(
>> >>              datasource, static_path, ts_dim, columns,
>> >> -            metric_spec, intervals, num_shards,
>> >> hadoop_dependency_coordinates)
>> >> +            metric_spec, intervals, num_shards, target_partition_size,
>> >> hadoop_dependency_coordinates)
>> >>          r = requests.post(
>> >>              self.ingest_post_url, headers=self.header, data=query)
>> >>          logging.info(self.ingest_post_url)
>> >> @@ -138,7 +147,7 @@ class DruidHook(BaseHook):
>> >>
>> >>      def load_from_hdfs(
>> >>              self, datasource, static_path,  ts_dim, columns,
>> >> -            intervals, num_shards, metric_spec=None,
>> >> hadoop_dependency_coordinates=None):
>> >> +            intervals, num_shards, target_partition_size,
>> >> metric_spec=None, hadoop_dependency_coordinates=None):
>> >>          """
>> >>          load data to druid from hdfs
>> >>          :params ts_dim: The column name to use as a timestamp
>> >> @@ -146,7 +155,7 @@ class DruidHook(BaseHook):
>> >>          """
>> >>          task_id = self.send_ingest_query(
>> >>              datasource, static_path, ts_dim, columns, metric_spec,
>> >> -            intervals, num_shards, hadoop_dependency_coordinates)
>> >> +            intervals, num_shards, target_partition_size,
>> >> hadoop_dependency_coordinates)
>> >>          status_url = self.get_ingest_status_url(task_id)
>> >>          while True:
>> >>              r = requests.get(status_url)
>> >>
>> >>
>> >>
>> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/operators/hive_to_druid.py
>> >> ----------------------------------------------------------------------
>> >> diff --git a/airflow/operators/hive_to_druid.py
>> >> b/airflow/operators/hive_to_druid.py
>> >> index 1346841..420aeed 100644
>> >> --- a/airflow/operators/hive_to_druid.py
>> >> +++ b/airflow/operators/hive_to_druid.py
>> >> @@ -49,7 +49,8 @@ class HiveToDruidTransfer(BaseOperator):
>> >>              metastore_conn_id='metastore_default',
>> >>              hadoop_dependency_coordinates=None,
>> >>              intervals=None,
>> >> -            num_shards=1,
>> >> +            num_shards=-1,
>> >> +            target_partition_size=-1,
>> >>              *args, **kwargs):
>> >>          super(HiveToDruidTransfer, self).__init__(*args, **kwargs)
>> >>          self.sql = sql
>> >> @@ -57,6 +58,7 @@ class HiveToDruidTransfer(BaseOperator):
>> >>          self.ts_dim = ts_dim
>> >>          self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}']
>> >>          self.num_shards = num_shards
>> >> +        self.target_partition_size = target_partition_size
>> >>          self.metric_spec = metric_spec or [{
>> >>              "name": "count",
>> >>              "type": "count"}]
>> >> @@ -103,8 +105,8 @@ class HiveToDruidTransfer(BaseOperator):
>> >>              datasource=self.druid_datasource,
>> >>              intervals=self.intervals,
>> >>              static_path=static_path, ts_dim=self.ts_dim,
>> >> -            columns=columns, num_shards=self.num_shards,
>> >> metric_spec=self.metric_spec,
>> >> -
>> >> hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
>> >> +            columns=columns, num_shards=self.num_shards,
>> >> target_partition_size=self.target_partition_size,
>> >> +            metric_spec=self.metric_spec,
>> >> hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
>> >>          logging.info("Load seems to have succeeded!")
>> >>
>> >>          logging.info(
>> >>
>> >>
>> >
>>
>
>

Mime
View raw message