flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [37/50] [abbrv] flink git commit: [FLINK-1201] [gelly] refactored methods for Graph creation
Date Wed, 11 Feb 2015 10:49:39 GMT
[FLINK-1201] [gelly] refactored methods for Graph creation

- constructors are now private
- create() methods have been renamed and refactored to fromDataSet()
- fromCollection() methods have been refactored
- overall consistency in creating new Graph objects, using the constructor now in all places

fixes #53


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0498abc8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0498abc8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0498abc8

Branch: refs/heads/master
Commit: 0498abc8501c58ba1b702721ed090146aff88d61
Parents: 0bb7b7f
Author: Carsten Brandt <mail@cebe.cc>
Authored: Sat Jan 17 00:56:22 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 10:46:15 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 347 +++++++++----------
 1 file changed, 156 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0498abc8/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 425a377..307f177 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -75,27 +75,27 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	private boolean isUndirected;
 
 	/**
-	 * Creates a graph from two datasets: vertices and edges
+	 * Creates a graph from two DataSets: vertices and edges
 	 *
 	 * @param vertices a DataSet of vertices.
 	 * @param edges a DataSet of vertices.
 	 * @param context the flink execution environment.
 	 */
-	public Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>>
edges, ExecutionEnvironment context) {
+	private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>>
edges, ExecutionEnvironment context) {
 
 		/** a graph is directed by default */
 		this(vertices, edges, context, false);
 	}
 
 	/**
-	 * Creates a graph from two datasets: vertices and edges and allow setting the undirected
property
+	 * Creates a graph from two DataSets: vertices and edges and allow setting the undirected
property
 	 *
 	 * @param vertices a DataSet of vertices.
 	 * @param edges a DataSet of vertices.
 	 * @param context the flink execution environment.
 	 * @param undirected whether this is an undirected graph
 	 */
-	public Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>>
edges, ExecutionEnvironment context,
+	private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>>
edges, ExecutionEnvironment context,
 			boolean undirected) {
 		this.vertices = vertices;
 		this.edges = edges;
@@ -104,6 +104,139 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	}
 
 	/**
+	 * Creates a graph from a Collection of vertices and a Collection of edges.
+	 * @param vertices a Collection of vertices.
+	 * @param edges a Collection of vertices.
+	 * @param context the flink execution environment.
+	 * @return the newly created graph.
+	 */
+	public static <K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable>
+		Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> vertices,
+										 Collection<Edge<K,EV>> edges,
+										 ExecutionEnvironment context) {
+
+		return fromDataSet(context.fromCollection(vertices), context.fromCollection(edges), context);
+	}
+
+	/**
+	 * Creates a graph from a Collection of edges, vertices are induced from the edges.
+	 * Vertices are created automatically and their values are set to NullValue.
+	 * @param edges a Collection of vertices.
+	 * @param context the flink execution environment.
+	 * @return the newly created graph.
+	 */
+	public static <K extends Comparable<K> & Serializable, EV extends Serializable>
+		Graph<K, NullValue, EV> fromCollection (Collection<Edge<K,EV>> edges,
ExecutionEnvironment context) {
+
+		return fromDataSet(context.fromCollection(edges), context);
+	}
+
+	/**
+	 * Creates a graph from a Collection of edges, vertices are induced from the edges and
+	 * vertex values are calculated by a mapper function.
+	 * Vertices are created automatically and their values are set
+	 * by applying the provided map function to the vertex ids.
+	 * @param edges a Collection of vertices.
+	 * @param mapper the mapper function.
+	 * @param context the flink execution environment.
+	 * @return the newly created graph.
+	 */
+	public static <K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable>
+		Graph<K, VV, EV> fromCollection (Collection<Edge<K,EV>> edges,
+										 final MapFunction<K, VV> mapper,
+										 ExecutionEnvironment context) {
+
+		return fromDataSet(context.fromCollection(edges), mapper, context);
+	}
+
+	/**
+	 * Creates a graph from a DataSet of vertices and a DataSet of edges.
+	 * @param vertices a DataSet of vertices.
+	 * @param edges a DataSet of vertices.
+	 * @param context the flink execution environment.
+	 * @return the newly created graph.
+	 */
+	public static <K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable>
+		Graph<K, VV, EV> fromDataSet (DataSet<Vertex<K,VV>> vertices,
+									  DataSet<Edge<K,EV>> edges,
+									  ExecutionEnvironment context) {
+
+		return new Graph<K, VV, EV>(vertices, edges, context);
+	}
+
+	/**
+	 * Creates a graph from a DataSet of edges, vertices are induced from the edges.
+	 * Vertices are created automatically and their values are set to NullValue.
+	 * @param edges a DataSet of vertices.
+	 * @param context the flink execution environment.
+	 * @return the newly created graph.
+	 */
+	public static <K extends Comparable<K> & Serializable, EV extends Serializable>
+		Graph<K, NullValue, EV> fromDataSet (DataSet<Edge<K,EV>> edges,
+											 ExecutionEnvironment context) {
+
+		DataSet<Vertex<K, NullValue>> vertices =
+				edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct();
+
+		return new Graph<K, NullValue, EV>(vertices, edges, context);
+	}
+
+	private static final class EmitSrcAndTarget<K extends Comparable<K> & Serializable,
EV extends Serializable>
+			implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
+
+		public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>>
out) {
+			out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
+			out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance()));
+		}
+	}
+
+	/**
+	 * Creates a graph from a DataSet of edges, vertices are induced from the edges and
+	 * vertex values are calculated by a mapper function.
+	 * Vertices are created automatically and their values are set
+	 * by applying the provided map function to the vertex ids.
+	 * @param edges a DataSet of vertices.
+	 * @param mapper the mapper function.
+	 * @param context the flink execution environment.
+	 * @return the newly created graph.
+	 */
+	public static <K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable>
+		Graph<K, VV, EV> fromDataSet (DataSet<Edge<K,EV>> edges,
+									  final MapFunction<K, VV> mapper,
+									  ExecutionEnvironment context) {
+
+		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
+
+		TypeInformation<VV> valueType = TypeExtractor
+				.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null);
+
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K,
VV>>)
+				new TupleTypeInfo(Vertex.class, keyType, valueType);
+
+		DataSet<Vertex<K, VV>> vertices =
+				edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>())
+				     .distinct()
+				     .map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
+						 public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
+							 return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
+						 }
+					 })
+				     .returns(returnType);
+
+		return new Graph<K, VV, EV>(vertices, edges, context);
+	}
+
+	private static final class EmitSrcAndTargetAsTuple1<K extends Comparable<K> &
Serializable,
+			EV extends Serializable> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>>
{
+
+		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
+			out.collect(new Tuple1<K>(edge.f0));
+			out.collect(new Tuple1<K>(edge.f1));
+		}
+	}
+
+	/**
 	 * @return the flink execution environment.
 	 */
 	public ExecutionEnvironment getContext() {
@@ -119,14 +252,14 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	}
 
 	/**
-	 * @return the vertex dataset.
+	 * @return the vertex DataSet.
 	 */
 	public DataSet<Vertex<K, VV>> getVertices() {
 		return vertices;
 	}
 
 	/**
-	 * @return the edge dataset.
+	 * @return the edge DataSet.
 	 */
 	public DataSet<Edge<K, EV>> getEdges() {
 		return edges;
@@ -156,7 +289,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 				})
     			.returns(returnType);
 
-        return new Graph<K, NV, EV>(mappedVertices, this.getEdges(), this.context);
+        return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context);
     }
 
     /**
@@ -197,7 +330,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 		DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
 				.coGroup(inputDataSet).where(0).equalTo(0)
 				.with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper));
-		return Graph.create(resultedVertices, this.getEdges(), this.getContext());
+		return new Graph(resultedVertices, this.edges, this.context);
 	}
 
 	private static final class ApplyCoGroupToVertexValues<K extends Comparable<K> &
Serializable,
@@ -243,7 +376,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(0,1).equalTo(0,1)
 				.with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper));
-		return Graph.create(this.getVertices(), resultedEdges, this.getContext());
+		return new Graph(this.vertices, resultedEdges, this.context);
 	}
 
 	private static final class ApplyCoGroupToEdgeValues<K extends Comparable<K> &
Serializable,
@@ -292,7 +425,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 				.coGroup(inputDataSet).where(0).equalTo(0)
 				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
 
-		return Graph.create(this.getVertices(), resultedEdges, this.getContext());
+		return new Graph(this.vertices, resultedEdges, this.context);
 	}
 
 	private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K>
& Serializable,
@@ -344,7 +477,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 				.coGroup(inputDataSet).where(1).equalTo(0)
 				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
 
-		return Graph.create(this.getVertices(), resultedEdges, this.getContext());
+		return new Graph(this.vertices, resultedEdges, this.context);
 	}
 
 	/**
@@ -672,125 +805,11 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 		}
 		else {
 			DataSet<Edge<K, EV>> undirectedEdges = edges.map(new ReverseEdgesMap<K,
EV>());
-			return new Graph<K, VV, EV>(vertices, (DataSet<Edge<K, EV>>) undirectedEdges,
this.context, true);
+			return new Graph<K, VV, EV>(this.vertices, undirectedEdges, this.context, true);
 		}
 	}
 
 	/**
-	 * Creates a graph from a dataset of vertices and a dataset of edges
-	 * @param vertices a DataSet of vertices.
-	 * @param edges a DataSet of vertices.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph
-	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable,
-		EV extends Serializable> Graph<K, VV, EV>
-		create(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges,

-				ExecutionEnvironment context) {
-		return new Graph<K, VV, EV>(vertices, edges, context);
-	}
-	
-	/**
-	 * Creates a graph from a DataSet of edges.
-	 * Vertices are created automatically and their values are set to NullValue.
-	 * @param edges a DataSet of vertices.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph
-	 */
-	public static <K extends Comparable<K> & Serializable, EV extends Serializable>

-		Graph<K, NullValue, EV> create(DataSet<Edge<K, EV>> edges, ExecutionEnvironment
context) {
-		DataSet<Vertex<K, NullValue>> vertices = 
-				edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct(); 
-		return new Graph<K, NullValue, EV>(vertices, edges, context);
-	}
-	
-	/**
-	 * Creates a graph from a DataSet of edges.
-	 * Vertices are created automatically and their values are set
-	 * by applying the provided map function to the vertex ids.
-	 * @param edges the input edges
-	 * @param mapper the map function to set the initial vertex value
-	 * @return the newly created graph
-	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable> 
-		Graph<K, VV, EV> create(DataSet<Edge<K, EV>> edges, final MapFunction<K,
VV> mapper, 
-				ExecutionEnvironment context) {
-		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
-
-		TypeInformation<VV> valueType = TypeExtractor
-				.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null);
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K,
VV>>)
-				new TupleTypeInfo(Vertex.class, keyType, valueType);
-
-		DataSet<Vertex<K, VV>> vertices = 
-				edges.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>())
-				.distinct().map(new MapFunction<Tuple1<K>, Vertex<K, VV>>(){
-					public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
-						return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
-					}
-				}).returns(returnType);
-		return new Graph<K, VV, EV>(vertices, edges, context);
-	}
-
-	private static final class EmitSrcAndTarget<K extends Comparable<K> & Serializable,
EV extends Serializable>
-		implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
-		public void flatMap(Edge<K, EV> edge,
-				Collector<Vertex<K, NullValue>> out) {
-
-				out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
-				out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance()));
-		}	
-	}
-
-	private static final class EmitSrcAndTargetAsTuple1<K extends Comparable<K> &
Serializable, 
-		EV extends Serializable> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>>
{
-		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
-
-			out.collect(new Tuple1<K>(edge.f0));
-			out.collect(new Tuple1<K>(edge.f1));
-		}	
-	}
-
-	/**
-	 * Read and create the graph vertex Tuple2 DataSet from a csv file
-	 *
-	 * The CSV file should be of the following format:
-	 *
-	 * <vertexID><delimiter><vertexValue>
-	 *
-	 * For example, with space delimiter:
-	 *
-	 * 1 57
-	 * 2 45
-	 * 3 77
-	 * 4 12
-	 *
-	 * @param context the flink execution environment.
-	 * @param filePath the path to the CSV file.
-	 * @param delimiter the CSV delimiter.
-	 * @param Tuple2IdClass The class to use for Vertex IDs
-	 * @param Tuple2ValueClass The class to use for Vertex Values
-	 * @return a set of vertices and their values.
-	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable>
-		DataSet<Tuple2<K, VV>>
-		readTuple2CsvFile(ExecutionEnvironment context, String filePath,
-			char delimiter, Class<K> Tuple2IdClass, Class<VV> Tuple2ValueClass) {
-
-		CsvReader reader = new CsvReader(filePath, context);
-		DataSet<Tuple2<K, VV>> vertices = reader.fieldDelimiter(delimiter).types(Tuple2IdClass,
Tuple2ValueClass)
-		.map(new MapFunction<Tuple2<K, VV>, Tuple2<K, VV>>() {
-
-			public Tuple2<K, VV> map(Tuple2<K, VV> value) throws Exception {
-				return (Tuple2<K, VV>)value;
-			}
-		});
-		return vertices;
-	}
-
-	/**
 	 * @return Singleton DataSet containing the vertex count
 	 */
 	public DataSet<Integer> numberOfVertices () {
@@ -914,14 +933,6 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
         }
 	}
 	
-    public Graph<K, VV, EV> fromCollection (Collection<Vertex<K,VV>> vertices,
Collection<Edge<K,EV>> edges) {
-
-		DataSet<Vertex<K, VV>> v = context.fromCollection(vertices);
-		DataSet<Edge<K, EV>> e = context.fromCollection(edges);
-
-		return new Graph<K, VV, EV>(v, e, context);
-	}
-
 	/**
 	 * Adds the input vertex and edges to the graph.
 	 * If the vertex already exists in the graph, it will not be added again,
@@ -936,14 +947,14 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 
     	// Take care of empty edge set
     	if (edges.isEmpty()) {
-    		return Graph.create(getVertices().union(newVertex).distinct(), getEdges(), context);
+    		return new Graph(this.vertices.union(newVertex).distinct(), this.edges, this.context);
     	}
 
     	// Add the vertex and its edges
-    	DataSet<Vertex<K, VV>> newVertices = getVertices().union(newVertex).distinct();
-    	DataSet<Edge<K, EV>> newEdges = getEdges().union(context.fromCollection(edges));
+    	DataSet<Vertex<K, VV>> newVertices = this.vertices.union(newVertex).distinct();
+    	DataSet<Edge<K, EV>> newEdges = this.edges.union(context.fromCollection(edges));
 
-    	return Graph.create(newVertices, newEdges, context);
+    	return new Graph(newVertices, newEdges, this.context);
     }
 
     /**
@@ -956,8 +967,11 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
      * @return the new graph containing the existing vertices and edges plus the newly added
edge
      */
     public Graph<K, VV, EV> addEdge (Vertex<K,VV> source, Vertex<K,VV>
target, EV edgeValue) {
-    	Graph<K,VV,EV> partialGraph = this.fromCollection(Arrays.asList(source, target),
-				Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)));
+    	Graph<K,VV,EV> partialGraph = fromCollection(
+				Arrays.asList(source, target),
+				Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)),
+				this.context
+		);
         return this.union(partialGraph);
     }
 
@@ -1020,7 +1034,7 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
     public Graph<K, VV, EV> removeEdge (Edge<K, EV> edge) {
 		DataSet<Edge<K, EV>> newEdges = getEdges().filter(
 				new EdgeRemovalEdgeFilter<K, EV>(edge));
-        return new Graph<K, VV, EV>(this.getVertices(), newEdges, this.context);
+        return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
     }
     
     private static final class EdgeRemovalEdgeFilter<K extends Comparable<K> &
Serializable, 
@@ -1062,58 +1076,9 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
     	DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(
     	                     VertexCentricIteration.withEdges(edges,
 						vertexUpdateFunction, messagingFunction, maximumNumberOfIterations));
-		return new Graph<K, VV, EV>(newVertices, edges, context);
+		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
     }
 
-	/**
-	 * Creates a graph from the given vertex and edge collections
-	 * @param context the flink execution environment.
-	 * @param v the collection of vertices
-	 * @param e the collection of edges
-	 * @return a new graph formed from the set of edges and vertices
-	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable,
-			EV extends Serializable> Graph<K, VV, EV>
-		fromCollection(ExecutionEnvironment context, Collection<Vertex<K, VV>> v,
-					   Collection<Edge<K, EV>> e) throws Exception {
-
-		DataSet<Vertex<K, VV>> vertices = context.fromCollection(v);
-		DataSet<Edge<K, EV>> edges = context.fromCollection(e);
-
-		return Graph.create(vertices, edges, context);
-	}
-
-	/**
-	 * Vertices may not have a value attached or may receive a value as a result of running
the algorithm.
-	 * @param context the flink execution environment.
-	 * @param e the collection of edges
-	 * @return a new graph formed from the edges, with no value for the vertices
-	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable,
-			EV extends Serializable> Graph<K, NullValue, EV>
-		fromCollection(ExecutionEnvironment context, Collection<Edge<K, EV>> e) {
-
-		DataSet<Edge<K, EV>> edges = context.fromCollection(e);
-
-		return Graph.create(edges, context);
-	}
-
-	/**
-	 * Vertices may have an initial value defined by a function.
-	 * @param context the flink execution environment.
-	 * @param e the collection of edges
-	 * @return a new graph formed from the edges, with a custom value for the vertices,
-	 * determined by the mapping function
-	 */
-	public static <K extends Comparable<K> & Serializable, VV extends Serializable,
-			EV extends Serializable> Graph<K, VV, EV>
-		fromCollection(ExecutionEnvironment context, Collection<Edge<K, EV>> e,
-					   final MapFunction<K, VV> mapper) {
-
-		DataSet<Edge<K, EV>> edges = context.fromCollection(e);
-		return Graph.create(edges, mapper, context);
-	}
-
 	public Graph<K, VV, EV> run (GraphAlgorithm<K, VV, EV> algorithm) {
 		return algorithm.run(this);
 	}


Mime
View raw message