madlib-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From orhankislal <...@git.apache.org>
Subject [GitHub] incubator-madlib pull request #112: Feautre: Add grouping support for PageRa...
Date Wed, 12 Apr 2017 00:06:51 GMT
Github user orhankislal commented on a diff in the pull request:

    https://github.com/apache/incubator-madlib/pull/112#discussion_r111041205
  
    --- Diff: src/ports/postgres/modules/graph/pagerank.py_in ---
    @@ -158,44 +313,198 @@ def pagerank(schema_madlib, vertex_table, vertex_id, edge_table,
edge_args,
             # https://en.wikipedia.org/wiki/PageRank#Damping_factor
     
             # The query below computes the PageRank of each node using the above formula.
    +        # A small explanatory note on ignore_group_clause:
    +        # This is used only when grouping is set. This essentially will have
    +        # the condition that will help skip the PageRank computation on groups
    +        # that have converged.
             plpy.execute("""
                     CREATE TABLE {message} AS
    -                SELECT {edge_temp_table}.{dest} AS {vertex_id},
    -                        SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_prob}
AS pagerank
    +                SELECT {grouping_cols_select} {edge_temp_table}.{dest} AS {vertex_id},
    +                        SUM({v1}.pagerank/{out_cnts}.{out_cnts_cnt})*{damping_factor}+{random_jump_prob}
AS pagerank
                     FROM {edge_temp_table}
    -                    INNER JOIN {cur} ON {edge_temp_table}.{dest}={cur}.{vertex_id}
    -                    INNER JOIN {out_cnts} ON {out_cnts}.{vertex_id}={edge_temp_table}.{src}
    -                    INNER JOIN {cur} AS {v1} ON {v1}.{vertex_id}={edge_temp_table}.{src}
    -                GROUP BY {edge_temp_table}.{dest}
    -            """.format(**locals()))
    +                    INNER JOIN {cur} ON {cur_join_clause}
    +                    INNER JOIN {out_cnts} ON {out_cnts_join_clause}
    +                    INNER JOIN {cur} AS {v1} ON {v1_join_clause}
    +                    {vertices_per_group_inner_join}
    +                {ignore_group_clause}
    +                GROUP BY {grouping_cols_select} {edge_temp_table}.{dest}
    +            """.format(grouping_cols_select=edge_grouping_cols_select+', '
    +                    if grouping_cols else '',
    +                random_jump_prob='MIN({vpg}.{random_prob})'.format(**locals())
    +                    if grouping_cols else random_probability,
    +                vertices_per_group_inner_join="""INNER JOIN {vertices_per_group}
    +                    AS {vpg} ON {vpg_join_clause}""".format(**locals())
    +                    if grouping_cols else '',
    +                ignore_group_clause=' WHERE '+get_ignore_groups(
    +                    summary_table, edge_temp_table, grouping_cols_list)
    +                    if iteration_num>0 and grouping_cols else '',
    +                **locals()))
             # If there are nodes that have no incoming edges, they are not captured in the
message table.
             # Insert entries for such nodes, with random_prob.
             plpy.execute("""
                     INSERT INTO {message}
    -                SELECT {vertex_id}, {random_prob}::DOUBLE PRECISION AS pagerank
    -                FROM {cur}
    -                WHERE {vertex_id} NOT IN (
    +                SELECT {grouping_cols_select} {cur}.{vertex_id}, {random_jump_prob} AS
pagerank
    +                FROM {cur} {vpg_from_clause}
    +                WHERE {vpg_where_clause} {vertex_id} NOT IN (
                         SELECT {vertex_id}
                         FROM {message}
    +                    {message_grp_where}
                     )
    -            """.format(**locals()))
    -        # Check for convergence will be done as part of grouping support for pagerank:
    -        # https://issues.apache.org/jira/browse/MADLIB-1082. So, the threshold parameter
    -        # is a dummy variable at the moment, the PageRank computation happens for
    -        # {max_iter} number of times.
    +                {ignore_group_clause}
    +                GROUP BY {grouping_cols_select} {cur}.{vertex_id}
    +            """.format(grouping_cols_select=cur_grouping_cols_select+','
    +                    if grouping_cols else '',
    +                vpg_from_clause=', {vertices_per_group} AS {vpg}'.format(**locals())
    +                    if grouping_cols else '',
    +                vpg_where_clause='{vpg_cur_join_clause} AND '.format(**locals())
    +                    if grouping_cols else '',
    +                message_grp_where='WHERE {message_grp}'.format(**locals())
    +                    if grouping_cols else '',
    +                random_jump_prob='MIN({vpg}.{random_prob})'.format(**locals())
    +                    if grouping_cols else random_probability,
    +                ignore_group_clause=' AND '+get_ignore_groups(
    +                    summary_table, cur, grouping_cols_list)
    +                    if iteration_num>0 and grouping_cols else '',
    +                **locals()))
    +
    +        # Check for convergence:
    +        ## Check for convergence only if threshold != 0.
    +        if threshold != 0:
    +            # message_unconv and cur_unconv will contain the unconverged groups
    +            # after current # and previous iterations respectively. Groups that
    +            # are missing in message_unconv but appear in cur_unconv are the
    +            # groups that have converged after this iteration's computations.
    +            # If no grouping columns are specified, then we check if there is
    +            # at least one unconverged node (limit 1 is used in the query).
    +            plpy.execute("""
    +                    CREATE TEMP TABLE {message_unconv} AS
    +                    SELECT {grouping_cols_select}
    +                    FROM {message}
    +                    INNER JOIN {cur}
    +                    ON {cur}.{vertex_id}={message}.{vertex_id}
    +                    WHERE {message_grp_clause}
    +                        ABS({cur}.pagerank-{message}.pagerank) > {threshold}
    +                    {ignore_group_clause}
    +                    {group_by_grouping_cols}
    +                    {limit}
    +                """.format(grouping_cols_select=cur_grouping_cols_select
    +                        if grouping_cols else '{0}.{1}'.format(cur, vertex_id),
    +                    group_by_grouping_cols=' GROUP BY {0}'.format(cur_grouping_cols_select)
    +                        if grouping_cols else '',
    +                    message_grp_clause='{0} AND '.format(message_grp)
    +                        if grouping_cols else '',
    +                    ignore_group_clause=' AND '+get_ignore_groups(summary_table, cur,
    +                        grouping_cols_list) if iteration_num>0 and grouping_cols else
'',
    +                    limit='' if grouping_cols else ' LIMIT 1 ',
    +                    **locals()))
    +            unconverged = plpy.execute("""SELECT COUNT(*) AS cnt FROM {0}
    +                """.format(message_unconv))[0]["cnt"]
    +            if iteration_num > 0 and grouping_cols:
    +                # Update result and summary tables for groups that have converged
    +                # since the last iteration.
    +                update_result_tables(temp_summary_table, iteration_num,
    +                    summary_table, out_table, message, grouping_cols_list,
    +                    cur_unconv, message_unconv)
    +            plpy.execute("""
    +                DROP TABLE IF EXISTS {cur_unconv};
    +                ALTER TABLE {message_unconv} RENAME TO {cur_unconv}
    +                """.format(**locals()))
    +        else:
    +            # Do not run convergence test if threshold=0, since that implies
    +            # the user wants to run max_iter iterations.
    +            unconverged = 1
             plpy.execute("""
    -                DROP TABLE IF EXISTS {cur};
    -                ALTER TABLE {message} RENAME TO {cur}
    -            """.format(**locals()))
    +                    DROP TABLE IF EXISTS {cur};
    +                    ALTER TABLE {message} RENAME TO {cur}
    +                """.format(**locals()))
    +        if unconverged == 0:
    +            break
     
    -    plpy.execute("ALTER TABLE {cur} RENAME TO {out_table}".format(**locals()))
    +    # If there still are some unconverged groups/(entire table), update results.
    +    if grouping_cols:
    +        if unconverged > 0:
    +            if threshold != 0:
    +                # We completed max_iters, but there are still some unconverged groups
    +                # Update the result and summary tables for unconverged groups.
    +                update_result_tables(temp_summary_table, iteration_num,
    +                    summary_table, out_table, cur, grouping_cols_list, cur_unconv)
    +            else:
    +                # No group has converged. List of all group values are in
    +                # distinct_grp_table.
    +                update_result_tables(temp_summary_table, iteration_num,
    +                    summary_table, out_table, cur, grouping_cols_list, distinct_grp_table)
    +    else:
    +        plpy.execute("""ALTER TABLE {table_name} RENAME TO {out_table}
    +            """.format(table_name=cur, **locals()))
    +        plpy.execute("""
    +                INSERT INTO {summary_table} VALUES
    +                ({iteration_num}+1);
    +            """.format(**locals()))
     
         ## Step 4: Cleanup
    -    plpy.execute("""
    -        DROP TABLE IF EXISTS {0},{1},{2},{3};
    -        """.format(out_cnts, edge_temp_table, cur, message))
    +    plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2},{3},{4},{5};
    +        """.format(out_cnts, edge_temp_table, cur, message, cur_unconv,
    +                    message_unconv))
    +    if grouping_cols:
    +        plpy.execute("""DROP TABLE IF EXISTS {0},{1},{2};
    +            """.format(vertices_per_group, temp_summary_table,
    +                distinct_grp_table))
         plpy.execute("SET client_min_messages TO %s" % old_msg_level)
     
    +def update_result_tables(temp_summary_table, i, summary_table, out_table,
    +    res_table, grouping_cols_list, cur_unconv, message_unconv=None):
    +    """
    +        This function updates the summary and output tables only for thouse
    +        groups that have converged. This is found out by looking at groups
    +        that appear in cur_unvonv but not in message_unconv.
    +        If this function is called after max_iter is completed, without
    +        convergence, all the unconverged groups from cur_unconv is used
    +        (note that message_unconv is renamed to cur_unconv before checking
    +        for unconverged==0 in the pagerank function's for loop)
    +    """
    +    if message_unconv is None:
    +        plpy.execute("""
    +            DROP TABLE IF EXISTS {temp_summary_table};
    +            CREATE TABLE {temp_summary_table} AS
    --- End diff --
    
    We can overwrite the temp_summary_table variable instead of creating a new table (as long
as we make sure not to drop it at the end).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message