airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@apache.org>
Subject Re: [1/3] incubator-airflow git commit: use targetPartitionSize as the default partition spec
Date Tue, 24 May 2016 00:07:23 GMT
Hey Dan,

Could you please file JIRAs, and put the JIRA name as the prefix to your
commits?

Cheers,
Chris

On Mon, May 23, 2016 at 5:01 PM, <davydov@apache.org> wrote:

> Repository: incubator-airflow
> Updated Branches:
>   refs/heads/airbnb_rb1.7.1_4 1d0d8681d -> 6f7ea90ae
>
>
> use targetPartitionSize as the default partition spec
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b58b5e09
> Tree:
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b58b5e09
> Diff:
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b58b5e09
>
> Branch: refs/heads/airbnb_rb1.7.1_4
> Commit: b58b5e09578d8a0df17b4de12fe3b49792e9feda
> Parents: 1d0d868
> Author: Hongbo Zeng <hongbo.zeng@airbnb.com>
> Authored: Sat May 14 17:00:42 2016 -0700
> Committer: Dan Davydov <dan.davydov@airbnb.com>
> Committed: Mon May 23 16:59:52 2016 -0700
>
> ----------------------------------------------------------------------
>  airflow/hooks/druid_hook.py        | 23 ++++++++++++++++-------
>  airflow/operators/hive_to_druid.py |  8 +++++---
>  2 files changed, 21 insertions(+), 10 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/hooks/druid_hook.py
> ----------------------------------------------------------------------
> diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py
> index b6cb231..7c80c7c 100644
> --- a/airflow/hooks/druid_hook.py
> +++ b/airflow/hooks/druid_hook.py
> @@ -10,7 +10,7 @@ from airflow.hooks.base_hook import BaseHook
>  from airflow.exceptions import AirflowException
>
>  LOAD_CHECK_INTERVAL = 5
> -
> +TARGET_PARTITION_SIZE = 5000000
>
>  class AirflowDruidLoadException(AirflowException):
>      pass
> @@ -52,13 +52,22 @@ class DruidHook(BaseHook):
>
>      def construct_ingest_query(
>              self, datasource, static_path, ts_dim, columns, metric_spec,
> -            intervals, num_shards, hadoop_dependency_coordinates=None):
> +            intervals, num_shards, target_partition_size,
> hadoop_dependency_coordinates=None):
>          """
>          Builds an ingest query for an HDFS TSV load.
>
>          :param datasource: target datasource in druid
>          :param columns: list of all columns in the TSV, in the right order
>          """
> +
> +        # backward compatibilty for num_shards, but target_partition_size
> is the default setting
> +        # and overwrites the num_shards
> +        if target_partition_size == -1:
> +            if num_shards == -1:
> +                target_partition_size = TARGET_PARTITION_SIZE
> +        else:
> +            num_shards = -1
> +
>          metric_names = [
>              m['fieldName'] for m in metric_spec if m['type'] != 'count']
>          dimensions = [c for c in columns if c not in metric_names and c
> != ts_dim]
> @@ -100,7 +109,7 @@ class DruidHook(BaseHook):
>                      },
>                      "partitionsSpec" : {
>                          "type" : "hashed",
> -                        "targetPartitionSize" : -1,
> +                        "targetPartitionSize" : target_partition_size,
>                          "numShards" : num_shards,
>                      },
>                  },
> @@ -121,10 +130,10 @@ class DruidHook(BaseHook):
>
>      def send_ingest_query(
>              self, datasource, static_path, ts_dim, columns, metric_spec,
> -            intervals, num_shards, hadoop_dependency_coordinates=None):
> +            intervals, num_shards, target_partition_size,
> hadoop_dependency_coordinates=None):
>          query = self.construct_ingest_query(
>              datasource, static_path, ts_dim, columns,
> -            metric_spec, intervals, num_shards,
> hadoop_dependency_coordinates)
> +            metric_spec, intervals, num_shards, target_partition_size,
> hadoop_dependency_coordinates)
>          r = requests.post(
>              self.ingest_post_url, headers=self.header, data=query)
>          logging.info(self.ingest_post_url)
> @@ -138,7 +147,7 @@ class DruidHook(BaseHook):
>
>      def load_from_hdfs(
>              self, datasource, static_path,  ts_dim, columns,
> -            intervals, num_shards, metric_spec=None,
> hadoop_dependency_coordinates=None):
> +            intervals, num_shards, target_partition_size,
> metric_spec=None, hadoop_dependency_coordinates=None):
>          """
>          load data to druid from hdfs
>          :params ts_dim: The column name to use as a timestamp
> @@ -146,7 +155,7 @@ class DruidHook(BaseHook):
>          """
>          task_id = self.send_ingest_query(
>              datasource, static_path, ts_dim, columns, metric_spec,
> -            intervals, num_shards, hadoop_dependency_coordinates)
> +            intervals, num_shards, target_partition_size,
> hadoop_dependency_coordinates)
>          status_url = self.get_ingest_status_url(task_id)
>          while True:
>              r = requests.get(status_url)
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b58b5e09/airflow/operators/hive_to_druid.py
> ----------------------------------------------------------------------
> diff --git a/airflow/operators/hive_to_druid.py
> b/airflow/operators/hive_to_druid.py
> index 1346841..420aeed 100644
> --- a/airflow/operators/hive_to_druid.py
> +++ b/airflow/operators/hive_to_druid.py
> @@ -49,7 +49,8 @@ class HiveToDruidTransfer(BaseOperator):
>              metastore_conn_id='metastore_default',
>              hadoop_dependency_coordinates=None,
>              intervals=None,
> -            num_shards=1,
> +            num_shards=-1,
> +            target_partition_size=-1,
>              *args, **kwargs):
>          super(HiveToDruidTransfer, self).__init__(*args, **kwargs)
>          self.sql = sql
> @@ -57,6 +58,7 @@ class HiveToDruidTransfer(BaseOperator):
>          self.ts_dim = ts_dim
>          self.intervals = intervals or ['{{ ds }}/{{ tomorrow_ds }}']
>          self.num_shards = num_shards
> +        self.target_partition_size = target_partition_size
>          self.metric_spec = metric_spec or [{
>              "name": "count",
>              "type": "count"}]
> @@ -103,8 +105,8 @@ class HiveToDruidTransfer(BaseOperator):
>              datasource=self.druid_datasource,
>              intervals=self.intervals,
>              static_path=static_path, ts_dim=self.ts_dim,
> -            columns=columns, num_shards=self.num_shards,
> metric_spec=self.metric_spec,
> -
> hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
> +            columns=columns, num_shards=self.num_shards,
> target_partition_size=self.target_partition_size,
> +            metric_spec=self.metric_spec,
> hadoop_dependency_coordinates=self.hadoop_dependency_coordinates)
>          logging.info("Load seems to have succeeded!")
>
>          logging.info(
>
>

Mime
View raw message