Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C1DF0200B50 for ; Fri, 29 Jul 2016 23:09:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C09EA160A79; Fri, 29 Jul 2016 21:09:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E0CAA160A6E for ; Fri, 29 Jul 2016 23:09:06 +0200 (CEST) Received: (qmail 37726 invoked by uid 500); 29 Jul 2016 21:09:06 -0000 Mailing-List: contact dev-help@madlib.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@madlib.incubator.apache.org Delivered-To: mailing list dev@madlib.incubator.apache.org Received: (qmail 37715 invoked by uid 99); 29 Jul 2016 21:09:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Jul 2016 21:09:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 78AE518914B for ; Fri, 29 Jul 2016 21:09:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -5.446 X-Spam-Level: X-Spam-Status: No, score=-5.446 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id vJabWTHH_jMK for ; Fri, 29 Jul 2016 21:09:03 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id BF5715F610 for ; Fri, 29 Jul 2016 21:09:02 +0000 (UTC) Received: (qmail 37694 invoked by uid 99); 29 Jul 2016 21:09:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Jul 2016 21:09:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DE6CFE78B5; Fri, 29 Jul 2016 21:09:01 +0000 (UTC) From: orhankislal To: dev@madlib.incubator.apache.org Reply-To: dev@madlib.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-madlib pull request #54: Pivoting: Phase 2 Content-Type: text/plain Message-Id: <20160729210901.DE6CFE78B5@git1-us-west.apache.org> Date: Fri, 29 Jul 2016 21:09:01 +0000 (UTC) archived-at: Fri, 29 Jul 2016 21:09:07 -0000 Github user orhankislal commented on a diff in the pull request: https://github.com/apache/incubator-madlib/pull/54#discussion_r72861263 --- Diff: src/ports/postgres/modules/utilities/pivot.py_in --- @@ -58,66 +61,256 @@ def pivot(schema_madlib, source_table, out_table, pivoted table @param aggregate_func The aggregate function to be applied to the values + @param fill_value If specified, determines how to fill NULL + values resulting from pivot operation + @param keep_null The flag for determining how to handle NULL + values in pivot columns """ - """ Assume we have the following table pivset( id INTEGER, piv FLOAT8, val FLOAT8 ) - where the piv column has 3 distinct values (10.0, 20.0 and 30.0). + where the piv column has 3 distinct values (10, 20 and 30). If the pivot function call is : SELECT madlib.pivot('pivset', 'pivout', 'id', 'piv', 'val'); We want to construct the following sql code to pivot the table. CREATE TABLE pivout AS (SELECT id, - sum(CASE WHEN "piv" = '10.0' THEN val ELSE NULL END ) as "piv_10.0", - sum(CASE WHEN "piv" = '20.0' THEN val ELSE NULL END ) as "piv_20.0", - sum(CASE WHEN "piv" = '30.0' THEN val ELSE NULL END ) as "piv_30.0" + avg(CASE WHEN "piv" = '10' THEN val ELSE NULL END ) as "val_avg_piv_10", + avg(CASE WHEN "piv" = '20' THEN val ELSE NULL END ) as "val_avg_piv_20", + avg(CASE WHEN "piv" = '30' THEN val ELSE NULL END ) as "val_avg_piv_30" FROM pivset GROUP BY id ORDER BY id) """ + + # If there are more than 1000 columns for the output table, we give a + # warning as it might give an error. + MAX_OUTPUT_COLUMN_COUNT = 1000 + + # If a column name has more than 63 characters it gets trimmed automaticly, + # which may cause an exception. We enable the output dictionary in this case + MAX_COLUMN_LENGTH = 63 + indices = split_quoted_delimited_str(index) - pcol = split_quoted_delimited_str(pivot_cols) - pval = split_quoted_delimited_str(pivot_values) - validate_pivot_coding(source_table, out_table, indices, pcol, pval) - new_col_names =[] - sql_list = ["CREATE TABLE " + out_table + " AS (SELECT " + index] + pcols = split_quoted_delimited_str(pivot_cols) + pvals = split_quoted_delimited_str(pivot_values) + validate_pivot_coding(source_table, out_table, indices, pcols, pvals) + + # Strip end quotes from pivot columns + pcols = [strip_end_quotes(pcol.strip()) for pcol in pcols] + pvals = [strip_end_quotes(pval.strip()) for pval in pvals] + + # Parse the aggregate_func as a dictionary + try: + param_types = dict.fromkeys(pvals, list) + agg_dict = extract_keyvalue_params(aggregate_func,param_types) + except KeyError, e: + with MinWarning("warning"): + plpy.warning("Pivot: Not all columns from '{aggregate_func}'" + " present in '{pivot_values}'".format(**locals())) + raise + + # If the dictionary is empty, parse it as a list + agg_set = split_quoted_delimited_str(aggregate_func) if not agg_dict else [] - pcol_no_quotes = strip_end_quotes(pcol[0].strip()) - pval_no_quotes = strip_end_quotes(pval[0].strip()) + # __madlib_pivot_def_agg__ denotes the aggregate function(s) if the user + # does not specify a value: aggregate dictionary + # If no aggregates are given, set average as default + agg_dict['__madlib_pivot_def_agg__'] = ['avg'] if not agg_set else agg_set # Find the distinct values of pivot_cols - distinct_values = plpy.execute( - "SELECT array_agg(DISTINCT {pcol} ORDER BY {pcol}) AS value " - "FROM {source_table}". - format(pcol=pcol[0], source_table=source_table)) - - distinct_values = [strip_end_quotes(item) - for item in distinct_values[0]['value']] - # The aggregate collects pivot_values values for a given pivot_cols value - case_str = ("{agg}(" - "CASE WHEN \"{{pcol}}\" = '{{value}}' THEN {pval} ELSE NULL END" - ")". - format(agg=aggregate_func, - pval=pval_no_quotes)) - sql_list.append( - ", " + - # Assign the name of the new column - ', '.join("{case_str} as \"{{pcol}}_{{value}}\"". - format(case_str=case_str). - format(pcol=pcol_no_quotes, value=str(value)) - for value in distinct_values if value is not None)) + # Note that the distinct values are appended in order of users list + # This ordering is important when we access pivot_comb entries + array_agg_str = ', '.join("array_agg(DISTINCT {pcol}) AS {pcol}_values". + format(pcol=pcol) for pcol in pcols) + + null_str = "" + if keep_null: + # Create an additional column for every pivot column + # If there is a null value, this column will get True + null_str = ","+', '.join( + "bool_or(CASE WHEN {pcol} IS NULL THEN True END)" + "AS {pcol}_isnull".format(pcol=pcol) for pcol in pcols) + + distinct_values = plpy.execute("SELECT {0} {1} FROM {2}". + format(array_agg_str, null_str, source_table)) + + # Collect the distinct values for every pivot column + pcol_distinct_values = {} + pcol_max_length = 0 + for pcol in pcols: + # Read the distinct values for this pcol + pcol_tmp = [item for item in distinct_values[0][pcol+"_values"]] + # Remove null values if keep null is not true + if not keep_null: + pcol_tmp = [x for x in pcol_tmp if x is not None] + elif distinct_values[0][pcol+"_isnull"] and None not in pcol_tmp: + pcol_tmp.append(None) + + pcol_distinct_values[pcol]=sorted(pcol_tmp) + # Max length of the string that pcol values can create + + # length of pcol + 1 (for _ character) + pcol_max_length+=max([len(str(item)) for item in pcol_tmp])+len(pcol)+1 + + # Create the combination of every possible pivot column + # Assume piv and piv2 are pivot columns. piv=(1,2) and piv2=(3,4,5) + # pivot_comb = ((1,3),(1,4),(1,5),(2,3),(2,4),(2,5)) + pivot_comb = list(itertools.product(*([pcol_distinct_values[pcol] + for pcol in pcols]))) + #Prepare the wrapper for fill value + fill_str_begin = "" + fill_str_end = "" + if fill_value is not None: + fill_str_begin = " COALESCE(" + fill_str_end = ", "+fill_value+" ) " + + # Check the max possible length of a output column name + # If it is over 63 (psql upper limit) create table lookup + for pval in pvals: + + col_name_len = pcol_max_length+len(pval)+1 + try: + # If user specifies a list of aggregates for a value column + # Every value column has to have an entry + agg_func = agg_dict[pval] if len(agg_dict) > 1 else \ --- End diff -- I know it looks a bit overcomplicated, that is why I added the comments. You suggestion will look much cleaner. The only issue is we have to create a map that has every value column to the same aggregate(s) if the user doesn't need a map. It shouldn't be that costly since 1600 is still the upper limit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---