madlib-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From njayaram2 <...@git.apache.org>
Subject [GitHub] incubator-madlib pull request #113: Graph: Add grouping support to SSSP
Date Mon, 10 Apr 2017 23:46:36 GMT
Github user njayaram2 commented on a diff in the pull request:

    https://github.com/apache/incubator-madlib/pull/113#discussion_r110786784
  
    --- Diff: src/ports/postgres/modules/graph/sssp.py_in ---
    @@ -198,110 +378,228 @@ def graph_sssp(schema_madlib, vertex_table, vertex_id, edge_table,
     			# values.
     
     			sql = (""" INSERT INTO {newupdate}
    -				SELECT DISTINCT ON (message.id) message.id AS id,
    -					message.val AS val,
    -					message.parent AS parent
    +				SELECT DISTINCT ON (message.id {comma_grp})
    +					message.id AS id,
    +					message.{weight} AS {weight},
    +					message.parent AS parent {comma_grp_m}
     				FROM {out_table} AS out_table INNER JOIN
     					(
    -						SELECT edge_table.{dest} AS id, x.val AS val,
    -							oldupdate.id AS parent
    +					SELECT {edge_table}.{dest} AS id, x.{weight} AS {weight},
    +						oldupdate.id AS parent {comma_grp_e}
    +					FROM {oldupdate} AS oldupdate INNER JOIN
    +						{edge_table}  ON
    +							({edge_table}.{src} = oldupdate.id {checkg_eo})
    +						INNER JOIN
    +						(
    +						SELECT {edge_table}.{dest} AS id,
    +							min(oldupdate.{weight} +
    +								{edge_table}.{weight}) AS {weight} {comma_grp_e}
     						FROM {oldupdate} AS oldupdate INNER JOIN
    -							{edge_table} AS edge_table ON
    -							(edge_table.{src} = oldupdate.id) INNER JOIN
    -							(
    -								SELECT edge_table.{dest} AS id,
    -									min(oldupdate.val + edge_table.{weight})
    -									AS val
    -								FROM {oldupdate} AS oldupdate INNER JOIN
    -									{edge_table} AS edge_table ON
    -									(edge_table.{src}=oldupdate.id)
    -								GROUP BY edge_table.{dest}
    -							) x ON (edge_table.{dest} = x.id)
    -						WHERE ABS(oldupdate.val + edge_table.{weight} - x.val)
    -							< {EPSILON}
    -					) AS message ON (message.id = out_table.{vertex_id})
    -				WHERE message.val<out_table.{weight}
    +							{edge_table}  ON
    +							({edge_table}.{src}=oldupdate.id {checkg_eo})
    +						GROUP BY {edge_table}.{dest} {comma_grp_e}
    +						) x
    +						ON ({edge_table}.{dest} = x.id {checkg_ex} )
    +					WHERE ABS(oldupdate.{weight} + {edge_table}.{weight}
    +								- x.{weight}) < {EPSILON}
    +					) message
    +					ON (message.id = out_table.{vertex_id} {checkg_om})
    +				WHERE message.{weight}<out_table.{weight}
     				""".format(**locals()))
     
    -			# If there are no updates, SSSP is finalized
    -			ret = plpy.execute(sql)
    -			if ret.nrows() == 0:
    -				break
    +			plpy.execute(sql)
     
    -			# Swap the update tables for the next iteration
    +			# Swap the update tables for the next iteration.
     			tmp = oldupdate
     			oldupdate = newupdate
     			newupdate = tmp
     
    -		# Bellman-Ford should converge in |V|-1 iterations.
    +		# The algorithm should converge in less than |V| iterations.
    +		# Otherwise there is a negative cycle in the graph.
     		if i == v_cnt:
    -			plpy.execute("DROP TABLE IF EXISTS {out_table}".format(**locals()))
    -			plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
    -
    -		m4_ifdef(<!__HAWQ__!>,
    -			plpy.execute("DROP TABLE {temp_table} ".format(**locals())), <!''!>)
    +			if grouping_cols is None:
    +				plpy.execute("DROP TABLE IF EXISTS {out_table}".
    +					format(**locals()))
    +				plpy.error("Graph SSSP: Detected a negative cycle in the graph.")
    +
    +			# It is possible that not all groups has negative cycles.
    +			else:
    +
    +				# gsql is the string created by collating grouping columns.
    +				# By looking at the oldupdate table we can see which groups
    +				# are in a negative cycle.
    +
    +				negs = plpy.execute(
    +					""" SELECT array_agg(DISTINCT ({grouping_cols})) AS grp
    +						FROM {oldupdate}
    +					""".format(**locals()))[0]['grp']
    +
    +				# Delete the groups with negative cycles from the output table.
    +				sql_del = """ DELETE FROM {out_table}
    +					USING {oldupdate} AS oldupdate
    +					WHERE {checkg_oo_sub}"""
    +				if is_hawq:
    +					sql_del = """
    +						TRUNCATE TABLE {temp_table};
    +						INSERT INTO {temp_table}
    +							SELECT *
    +							FROM {out_table}
    +							WHERE NOT EXISTS(
    +								SELECT 1
    +								FROM {oldupdate} as oldupdate
    +								WHERE {checkg_oo_sub}
    +								);
    +						DROP TABLE {out_table};
    +						ALTER TABLE {temp_table} RENAME TO {out_table};"""
    +
    +				plpy.execute(sql_del.format(**locals()))
    +
    +				# If every group has a negative cycle,
    +				# drop the output table as well.
    +				if table_is_empty(out_table):
    +					plpy.execute("DROP TABLE IF EXISTS {0},{1}".
    +						format(out_table,out_table+"_summary"))
    +
    +				plpy.warning(
    +					"""Graph SSSP: Detected a negative cycle in the """ +
    +					"""sub-graphs of following groups: {0}.""".
    +					format(str(negs)[1:-1]))
    +
    +		if is_hawq:
    +			plpy.execute("DROP TABLE IF EXISTS {temp_table} ".
    +				format(**locals()))
     
     	return None
     
    -def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, **kwargs):
    +def _path_helper(plan_name, dest_vertex, path_table, gset):
     	"""
    -	Helper function that can be used to get the shortest path for a vertex
    -    Args:
    -    	@param source_table	Name of the table that contains the SSSP output.
    -        @param out_table	The vertex that will be the destination of the
    -            				desired path.
    +	Helper function for graph_sssp_get_path. Invoked once per group.
    +	@param plan_name    Name of the plan that finds the parent of a given
    +	                    vertex.
    +    @param dest_vertex  The vertex that will be the destination of the
    +                        desired path.
    +    @param path_table   Name of the output table that will contain the path(s).
    +    @param gset	        List of grouping columns values followed by a comma.
    +                        Empty string if there are no grouping columns.
     	"""
     
    -	validate_get_path(sssp_table, dest_vertex)
    +	ret = [str(dest_vertex)]
     	cur = dest_vertex
    -	cols = get_cols(sssp_table)
    -	id = cols[0]
    -	ret = [dest_vertex]
    -	plan_name = unique_string(desp='plan')
    -
    -	# Follow the 'parent' chain until you reach the source.
    -	# We don't need to know what the source is since it is the only vertex with
    -	# itself as its parent
    -	plpy.execute(""" PREPARE {plan_name} (int) AS
    -		SELECT parent FROM {sssp_table} WHERE {id} = $1 LIMIT 1
    -		""".format(**locals()))
     	sql = "EXECUTE {plan_name} ({cur})"
     	parent = plpy.execute(sql.format(**locals()))
    +	if parent.nrows() > 0:
    +		while 1:
    +			parent = parent[0]['parent']
    +			if parent == cur:
    +				ret.reverse()
    +				ret = ",".join(ret)
    +				plpy.execute(
    +					""" INSERT INTO {path_table} VALUES ({gset} '{ret}')
    +					""".format(**locals()))
    +				return True
    +			else:
    +				ret.append(str(parent))
    +				cur = parent
    +
    +			parent = plpy.execute(sql.format(**locals()))
    +
    +	return False
    +
    +def graph_sssp_get_path(schema_madlib, sssp_table, dest_vertex, path_table,
    +	**kwargs):
    +	"""
    +    Helper function that can be used to get the shortest path for a vertex
    +    Args:
    +        @param source_table Name of the table that contains the SSSP output.
    +        @param out_table    The vertex that will be the destination of the
    +                            desired path.
    +        @param path_table   Name of the output table that contains the path.
    +	"""
    +	with MinWarning("warning"):
    +
    +		_validate_get_path(sssp_table, dest_vertex)
    +		plan_name = unique_string(desp='plan')
    +
    +		select_grps = ""
    +		gsql = ""
    +
    +		path_flag = False
    +
    +		summary = plpy.execute("SELECT * FROM {0}_summary".format(sssp_table))
    +		vertex_id = summary[0]['vertex_id']
    +		if vertex_id == "NULL":
    +			vertex_id = "id"
    +
    +		grouping_cols = summary[0]['grouping_cols']
    +		if grouping_cols != "NULL":
    +			glist = split_quoted_delimited_str(grouping_cols)
    +			select_grps = _grp_from_table(sssp_table,glist) + " , "
    +
    +		plpy.execute("""
    +			CREATE TABLE {path_table} AS
    +				SELECT {select_grps} ''::text as path
    +				FROM {sssp_table}
    +				LIMIT 0
    +			""".format(**locals()))
    +
    +		plan = """ PREPARE {plan_name} (int) AS
    +			SELECT parent FROM {sssp_table}
    +			WHERE {vertex_id} = $1 AND parent IS NOT NULL {gsql} LIMIT 1
    +			"""
    +
    +		if grouping_cols == "NULL":
    +			plpy.execute(plan.format(**locals()))
    +			path_flag = _path_helper(plan_name,dest_vertex,path_table,"")
     
    -	if parent.nrows() == 0:
    -		plpy.error(
    -			"Graph SSSP: Vertex {0} is not present in the sssp table {1}".
    -			format(dest_vertex,sssp_table))
    -
    -	while 1:
    -		parent = parent[0]['parent']
    -		if parent == cur:
    -			ret.reverse()
    -			return ret
     		else:
    -			ret.append(parent)
    -			cur = parent
    -		parent = plpy.execute(sql.format(**locals()))
    +			gvalues = plpy.execute(
    +				""" SELECT {grouping_cols} FROM {sssp_table}
    +					GROUP BY {grouping_cols}""".format(**locals()))
    +
    +			for i in gvalues:
    +				cur = dest_vertex
    +				gcheck = []
    +				gset = []
    +				for j in glist:
    +					gcheck.append("{0} = {1}".format(j,i[j]))
    +					gset.append("{0}".format(i[j]))
    +				gsql = " AND " + " AND ".join(gcheck)
    --- End diff --
    
    Try using list comprehension here as well, it is supposed
    to be faster than append.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message