flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [31/50] [abbrv] flink git commit: [FLINK-1201] [gelly] create similar users graph
Date Wed, 11 Feb 2015 10:49:33 GMT
[FLINK-1201] [gelly] create similar users graph


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ff94da2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ff94da2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ff94da2

Branch: refs/heads/master
Commit: 8ff94da2ef4b56733127781202393d4abbbe7d0f
Parents: 3cef210
Author: vasia <vasilikikalavri@gmail.com>
Authored: Mon Jan 5 17:16:51 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 10:46:15 2015 +0100

----------------------------------------------------------------------
 .../flink/graph/example/MusicProfiles.java      | 145 +++++++++++--------
 1 file changed, 88 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ff94da2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index cd425a3..d74e339 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -1,9 +1,11 @@
 package flink.graphs.example;
 
-import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -15,85 +17,114 @@ import org.apache.flink.util.Collector;
 
 import flink.graphs.Edge;
 import flink.graphs.EdgeDirection;
-import flink.graphs.EdgesFunction;
+import flink.graphs.EdgesFunctionWithVertexValue;
 import flink.graphs.Graph;
+import flink.graphs.Vertex;
 import flink.graphs.utils.Tuple3ToEdgeMap;
 
 public class MusicProfiles implements ProgramDescription {
 
-    @SuppressWarnings("serial")
 	public static void main (String [] args) throws Exception {
     	
     	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-    	/** read the user-song-play triplets
+    	/** 
+    	 *  Read the user-song-play triplets
     	 *  The format is <userID>\t<songID>\t<playcount>
     	 */
     	DataSet<Tuple3<String, String, Integer>> triplets = env.readCsvFile(args[0])
     			.lineDelimiter("\n").fieldDelimiter('\t').types(String.class, String.class, Integer.class);
     	
     	/**
-    	 *  read the mismatches dataset and extract the songIDs
+    	 *  Read the mismatches dataset and extract the songIDs
     	 *  The format is "ERROR: <songID trackID> song_title"
     	 */
-    	DataSet<Tuple1<String>> mismatches = env.readTextFile(args[1]).map(
-    			new MapFunction<String, Tuple1<String>>() {
-					public Tuple1<String> map(String value) {
-						// TODO Auto-generated method stub
-						return null;
-					}
-		});
-
-    	// filter out the mismatches from the triplets dataset
+    	DataSet<Tuple1<String>> mismatches = env.readTextFile(args[1]).map(new ExtractMismatchSongIds());
+
+    	/**
+    	 *  Filter out the mismatches from the triplets dataset
+    	 */
     	DataSet<Tuple3<String, String, Integer>> validTriplets = triplets.coGroup(mismatches)
-    			.where(1).equalTo(0).with(new CoGroupFunction<Tuple3<String, String, Integer>,
Tuple1<String>, 
-    					Tuple3<String, String, Integer>>() {
-							public void coGroup(
-									Iterable<Tuple3<String, String, Integer>> triplets,
-									Iterable<Tuple1<String>> invalidSongs,
-									Collector<Tuple3<String, String, Integer>> out) {
-								if (!invalidSongs.iterator().hasNext()) {
-									// this is a valid triplet
-									out.collect(triplets.iterator().next());
-								}
-							}
-				});
-
-    	// Create a user -> song weighted bipartite graph
-    	// where the edge weights correspond to play counts
-    	DataSet<Edge<String, Integer>> userSongEdges = validTriplets.map(
-    			new Tuple3ToEdgeMap<String, Integer>());
+    			.where(1).equalTo(0).with(new FilterOutMismatches());
 
+    	/**
+    	 *  Create a user -> song weighted bipartite graph
+    	 *  where the edge weights correspond to play counts
+    	 */
+    	DataSet<Edge<String, Integer>> userSongEdges = validTriplets.map(new Tuple3ToEdgeMap<String,
Integer>());
     	Graph<String, NullValue, Integer> userSongGraph = Graph.create(userSongEdges,
env);
 
-    	// get the top track (most listened) for each user
-    	DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph.reduceOnEdges(
-    			new EdgesFunction<String, Integer, String>() {
-					public Tuple2<String, String> iterateEdges(
-							Iterable<Tuple2<String, Edge<String, Integer>>> edges) {
-						int maxPlaycount = 0;
-						String userId = ""; 
-						String topSong = "";
-
-						final Iterator<Tuple2<String, Edge<String, Integer>>> edgesIterator
= 
-								edges.iterator();
-						if (edgesIterator.hasNext()) {
-							Tuple2<String, Edge<String, Integer>> first = edgesIterator.next();
-							userId = first.f0;
-							topSong = first.f1.getTarget();
-						}
-						while (edgesIterator.hasNext()) {
-							Tuple2<String, Edge<String, Integer>> edge = edgesIterator.next();
-							if (edge.f1.getValue() > maxPlaycount) {
-								maxPlaycount = edge.f1.getValue();
-								topSong = edge.f1.getTarget();
-							}
-						}
-						return new Tuple2<String, String> (userId, topSong);
-					}
-		}, EdgeDirection.OUT);
+    	/**
+    	 *  Get the top track (most listened) for each user
+    	 */
+    	DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph.reduceOnEdges(new
GetTopSongPerUser(), 
+    			EdgeDirection.OUT);
+
+    	/**
+    	 * Create a user-user similarity graph, based on common songs, 
+    	 * i.e. two users that listen to the same song are connected.
+    	 * For each song, we create an edge between each pair of its in-neighbors.
+    	 */
+    	DataSet<Edge<String, NullValue>> similarUsers = userSongGraph.getEdges().groupBy(1)
+    			.reduceGroup(new CreateSimilarUserEdges()).distinct();
+    	Graph<String, NullValue, NullValue> similarUsersGraph = Graph.create(similarUsers,
env).getUndirected();
+
     }
 
+    @SuppressWarnings("serial")
+	public static final class ExtractMismatchSongIds implements MapFunction<String, Tuple1<String>>
{
+		public Tuple1<String> map(String value) {
+			String[] tokens = value.split("\\s+"); 
+			String songId = tokens[1].substring(1);
+			return new Tuple1<String>(songId);
+		}
+    }
+
+    @SuppressWarnings("serial")
+	public static final class FilterOutMismatches implements CoGroupFunction<Tuple3<String,
String, Integer>, 
+    	Tuple1<String>, Tuple3<String, String, Integer>> {
+		public void coGroup(
+				Iterable<Tuple3<String, String, Integer>> triplets,
+				Iterable<Tuple1<String>> invalidSongs,
+				Collector<Tuple3<String, String, Integer>> out) {
+			if (!invalidSongs.iterator().hasNext()) {
+				// this is a valid triplet
+				out.collect(triplets.iterator().next());
+			}
+		}
+    }
+
+    @SuppressWarnings("serial")
+	public static final class GetTopSongPerUser implements EdgesFunctionWithVertexValue
+		<String, NullValue, Integer, Tuple2<String, String>> {
+		public Tuple2<String, String> iterateEdges(Vertex<String, NullValue> vertex,

+				Iterable<Edge<String, Integer>> edges) {
+			int maxPlaycount = 0;
+			String topSong = "";
+			for (Edge<String, Integer> edge: edges) {
+				if (edge.getValue() > maxPlaycount) {
+					maxPlaycount = edge.getValue();
+					topSong = edge.getTarget();
+				}
+			}
+			return new Tuple2<String, String> (vertex.getId(), topSong);
+		}
+    }
+
+    @SuppressWarnings("serial")
+ 	public static final class CreateSimilarUserEdges implements GroupReduceFunction<Edge<String,
Integer>,
+ 		Edge<String, NullValue>> {
+		public void reduce(Iterable<Edge<String, Integer>> edges, Collector<Edge<String,
NullValue>> out) {
+			List<String> listeners = new ArrayList<String>();
+			for (Edge<String, Integer> edge : edges) {
+				listeners.add(edge.getSource());
+			}
+			for (int i=0; i < listeners.size()-1; i++) {
+				out.collect(new Edge<String, NullValue>(listeners.get(i), listeners.get(i+1)));
+			}
+		}
+    }
+	
 	@Override
 	public String getDescription() {
 		return null;


Mime
View raw message