From dev-return-2369-archive-asf-public=cust-asf.ponee.io@datafu.apache.org Tue Jul 16 22:04:40 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id D5B2A18064E for ; Wed, 17 Jul 2019 00:04:39 +0200 (CEST) Received: (qmail 83602 invoked by uid 500); 16 Jul 2019 22:04:39 -0000 Mailing-List: contact dev-help@datafu.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@datafu.apache.org Delivered-To: mailing list dev@datafu.apache.org Received: (qmail 83586 invoked by uid 99); 16 Jul 2019 22:04:39 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 16 Jul 2019 22:04:39 +0000 From: GitBox To: dev@datafu.apache.org Subject: [GitHub] [datafu] rjurney commented on a change in pull request #15: Add Spark functionality to DataFu, datafu-spark Message-ID: <156331467413.25587.6462470334419164333.gitbox@gitbox.apache.org> Date: Tue, 16 Jul 2019 22:04:34 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit rjurney commented on a change in pull request #15: Add Spark functionality to DataFu, datafu-spark URL: https://github.com/apache/datafu/pull/15#discussion_r304139883 ########## File path: datafu-spark/src/main/resources/pyspark_utils/df_utils.py ########## @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyspark +from pyspark.sql import DataFrame +from pyspark_utils.bridge_utils import _getjvm_class + + +def _get_utils(df): + _gateway = df._sc._gateway + return _getjvm_class(_gateway, "datafu.spark.SparkDFUtilsBridge") + + +# public: + + +def dedup(df, group_col, order_cols = []): + """ + Used get the 'latest' record (after ordering according to the provided order columns) in each group. + :param df: DataFrame to operate on + :param group_col: column to group by the records + :param order_cols: columns to order the records according to. + :return: DataFrame representing the data after the operation + """ + java_cols = _cols_to_java_cols(order_cols) + jdf = _get_utils(df).dedup(df._jdf, group_col._jc, java_cols) + return DataFrame(jdf, df.sql_ctx) + + +def dedup_top_n(df, n, group_col, order_cols = []): + """ + Used get the top N records (after ordering according to the provided order columns) in each group. + :param df: DataFrame to operate on + :param n: number of records to return from each group + :param group_col: column to group by the records + :param order_cols: columns to order the records according to + :return: DataFrame representing the data after the operation + """ + java_cols = _cols_to_java_cols(order_cols) + jdf = _get_utils(df).dedupTopN(df._jdf, n, group_col._jc, java_cols) + return DataFrame(jdf, df.sql_ctx) + + +def dedup2(df, group_col, order_by_col, desc = True, columns_filter = [], columns_filter_keep = True): + """ + Used get the 'latest' record (after ordering according to the provided order columns) in each group. + :param df: DataFrame to operate on + :param group_col: column to group by the records + :param order_by_col: column to order the records according to + :param desc: have the order as desc + :param columns_filter: columns to filter + :param columns_filter_keep: indicates whether we should filter the selected columns 'out' or alternatively have only +* those columns in the result + :return: DataFrame representing the data after the operation + """ + jdf = _get_utils(df).dedup2(df._jdf, group_col._jc, order_by_col._jc, desc, columns_filter, columns_filter_keep) + return DataFrame(jdf, df.sql_ctx) + + +def change_schema(df, new_scheme = []): + """ + Returns a DataFrame with the column names renamed to the column names in the new schema + :param df: DataFrame to operate on + :param new_scheme: new column names + :return: DataFrame representing the data after the operation + """ + jdf = _get_utils(df).changeSchema(df._jdf, new_scheme) + return DataFrame(jdf, df.sql_ctx) + + +def join_skewed(df_left, df_right, join_exprs, num_shards = 30, join_type="inner"): + """ + Used to perform a join when the right df is relatively small but doesn't fit to perform broadcast join. + Use cases: + a. excluding keys that might be skew from a medium size list. + b. join a big skewed table with a table that has small number of very big rows. + :param df_left: left DataFrame + :param df_right: right DataFrame + :param join_exprs: join expression + :param num_shards: number of shards + :param join_type: join type + :return: DataFrame representing the data after the operation + """ + jdf = _get_utils(df_left).joinSkewed(df_left._jdf, df_right._jdf, join_exprs._jc, num_shards, join_type) + return DataFrame(jdf, df_left.sql_ctx) + + +def broadcast_join_skewed(not_skewed_df, skewed_df, join_col, number_of_custs_to_broadcast): + """ + Suitable to perform a join in cases when one DF is skewed and the other is not skewed. + splits both of the DFs to two parts according to the skewed keys. + 1. Map-join: broadcasts the skewed-keys part of the not skewed DF to the skewed-keys part of the skewed DF + 2. Regular join: between the remaining two parts. + :param not_skewed_df: not skewed DataFrame + :param skewed_df: skewed DataFrame + :param join_col: join column + :param number_of_custs_to_broadcast: number of custs to broadcast + :return: DataFrame representing the data after the operation + """ + jdf = _get_utils(skewed_df).broadcastJoinSkewed(not_skewed_df._jdf, skewed_df._jdf, join_col, number_of_custs_to_broadcast) + return DataFrame(jdf, not_skewed_df.sql_ctx) + + +def join_with_range(df_single, col_single, df_range, col_range_start, col_range_end, decrease_factor): + """ + Helper function to join a table with column to a table with range of the same column. + For example, ip table with whois data that has range of ips as lines. + The main problem which this handles is doing naive explode on the range can result in huge table. + requires: + 1. single table needs to be distinct on the join column, because there could be a few corresponding ranges so we dedup at the end - we choose the minimal range. + 2. the range and single columns to be numeric. + """ + jdf = _get_utils(df_single).joinWithRange(df_single._jdf, col_single, df_range._jdf, col_range_start, col_range_end, decrease_factor) + return DataFrame(jdf, df_single.sql_ctx) + + +def join_with_range_and_dedup(df_single, col_single, df_range, col_range_start, col_range_end, decrease_factor, dedup_small_range): + """ + Helper function to join a table with column to a table with range of the same column. + For example, ip table with whois data that has range of ips as lines. + The main problem which this handles is doing naive explode on the range can result in huge table. + requires: + 1. single table needs to be distinct on the join column, because there could be a few corresponding ranges so we dedup at the end - we choose the minimal range. + 2. the range and single columns to be numeric. + """ + jdf = _get_utils(df_single).joinWithRangeAndDedup(df_single._jdf, col_single, df_range._jdf, col_range_start, col_range_end, decrease_factor, dedup_small_range) + return DataFrame(jdf, df_single.sql_ctx) + + +def _cols_to_java_cols(cols): + return _map_if_needed(lambda x: x._jc, cols) + + +def _dfs_to_java_dfs(dfs): + return _map_if_needed(lambda x: x._jdf, dfs) + + +def _map_if_needed(func, itr): + return map(func, itr) if itr is not None else itr + + +def activate(): + """Activate integration between datafu-spark and PySpark. + This function only needs to be called once. + + This technique taken from pymongo_spark + https://github.com/mongodb/mongo-hadoop/blob/master/spark/src/main/python/pymongo_spark.py + """ + pyspark.sql.DataFrame.dedup = dedup Review comment: Awesome! ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services