flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/5] incubator-flink git commit: [streaming] Removed obsolete Join example
Date Thu, 13 Nov 2014 15:06:14 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 818ebda0f -> d332d6c31


[streaming] Removed obsolete Join example


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/537d6f6d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/537d6f6d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/537d6f6d

Branch: refs/heads/master
Commit: 537d6f6da529d534f9c523c0b208861b381e3905
Parents: 818ebda
Author: Marton Balassi <mbalassi@apache.org>
Authored: Mon Nov 10 14:15:05 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Thu Nov 13 15:24:03 2014 +0100

----------------------------------------------------------------------
 .../streaming/examples/join/GradeSource.java    | 45 -----------
 .../streaming/examples/join/JoinLocal.java      | 53 ------------
 .../flink/streaming/examples/join/JoinTask.java | 84 --------------------
 .../streaming/examples/join/SalarySource.java   | 45 -----------
 4 files changed, 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
deleted file mode 100644
index 93bcdab..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/GradeSource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class GradeSource implements SourceFunction<Tuple2<String, Integer>> {
-
-	private static final long serialVersionUID = -5897483980082089771L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
-	private Random rand = new Random();
-	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
-	@Override
-	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception
{
-		// Continuously emit tuples with random names and integers (grades).
-		while (true) {
-			outTuple.f0 = names[rand.nextInt(names.length)];
-			outTuple.f1 = rand.nextInt(5) + 1;
-
-			out.collect(outTuple);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
deleted file mode 100644
index 717ad8e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-public class JoinLocal {
-
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALLELISM = 1;
-
-	// This example will join two streams. One which emits people's grades and
-	// one which emits people's salaries.
-
-	public static void main(String[] args) throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(
-				PARALLELISM).setBufferTimeout(100);
-
-		DataStream<Tuple2<String, Integer>> grades = env.addSource(new GradeSource(),
-				SOURCE_PARALLELISM);
-		
-		DataStream<Tuple2<String, Integer>> salaries = env.addSource(new SalarySource(),
-				SOURCE_PARALLELISM);
-
-		DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades.connect(salaries)
-				.flatMap(new JoinTask());
-		
-		System.out.println("(NAME, GRADE, SALARY)");
-		joinedStream.print();
-
-		env.execute();
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
deleted file mode 100644
index 35e18f4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinTask.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.RichCoFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-//Joins the input value with the already known values. If it is a grade
-// then with the salaries, if it is a salary then with the grades. Also
-// stores the new element.
-public class JoinTask extends
-		RichCoFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple3<String, Integer, Integer>> {
-	private static final long serialVersionUID = 1L;
-
-	private HashMap<String, ArrayList<Integer>> gradeHashmap;
-	private HashMap<String, ArrayList<Integer>> salaryHashmap;
-	private String name;
-
-	public JoinTask() {
-		gradeHashmap = new HashMap<String, ArrayList<Integer>>();
-		salaryHashmap = new HashMap<String, ArrayList<Integer>>();
-		name = new String();
-	}
-
-	Tuple3<String, Integer, Integer> outputTuple = new Tuple3<String, Integer, Integer>();
-
-	// GRADES
-	@Override
-	public void flatMap1(Tuple2<String, Integer> value,
-			Collector<Tuple3<String, Integer, Integer>> out) {
-		name = value.f0;
-		outputTuple.f0 = name;
-		outputTuple.f1 = value.f1;
-		if (salaryHashmap.containsKey(name)) {
-			for (Integer salary : salaryHashmap.get(name)) {
-				outputTuple.f2 = salary;
-				out.collect(outputTuple);
-			}
-		}
-		if (!gradeHashmap.containsKey(name)) {
-			gradeHashmap.put(name, new ArrayList<Integer>());
-		}
-		gradeHashmap.get(name).add(value.f1);
-	}
-
-	// SALARIES
-	@Override
-	public void flatMap2(Tuple2<String, Integer> value,
-			Collector<Tuple3<String, Integer, Integer>> out) {
-		name = value.f0;
-		outputTuple.f0 = name;
-		outputTuple.f2 = value.f1;
-		if (gradeHashmap.containsKey(name)) {
-			for (Integer grade : gradeHashmap.get(name)) {
-				outputTuple.f1 = grade;
-				out.collect(outputTuple);
-			}
-		}
-		if (!salaryHashmap.containsKey(name)) {
-			salaryHashmap.put(name, new ArrayList<Integer>());
-		}
-		salaryHashmap.get(name).add(value.f1);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/537d6f6d/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
deleted file mode 100644
index 988935f..0000000
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/SalarySource.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.examples.join;
-
-import java.util.Random;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-import org.apache.flink.util.Collector;
-
-public class SalarySource implements SourceFunction<Tuple2<String, Integer>>
{
-
-	private static final long serialVersionUID = 1L;
-
-	private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", "sasa", "lawrance",
-			"andrew", "jean", "richard", "smith", "gorge", "black", "peter" };
-	private Random rand = new Random();
-	private Tuple2<String, Integer> outTuple = new Tuple2<String, Integer>();
-
-	@Override
-	public void invoke(Collector<Tuple2<String, Integer>> out) throws Exception
{
-		// Continuously emit tuples with random names and integers (salaries).
-		while (true) {
-
-			outTuple.f0 = names[rand.nextInt(names.length)];
-			outTuple.f1 = rand.nextInt(10000);
-			out.collect(outTuple);
-		}
-	}
-}


Mime
View raw message