flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/2] flink git commit: [streaming] Added ITCase for streaming classloading
Date Fri, 13 Feb 2015 13:58:42 GMT
[streaming] Added ITCase for streaming classloading


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bec2287
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bec2287
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bec2287

Branch: refs/heads/master
Commit: 6bec2287789f28c21316e67e1516a98d88aa786c
Parents: cba7866
Author: mbalassi <mbalassi@apache.org>
Authored: Thu Feb 12 22:20:14 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Feb 13 14:05:35 2015 +0100

----------------------------------------------------------------------
 flink-tests/pom.xml                             |  30 +++-
 .../test/assembly/test-custominput-assembly.xml |   3 +-
 .../test-streamingclassloader-assembly.xml      |  39 +++++
 .../InputSplitClassLoaderITCase.java            |   2 +-
 .../StreamingClassLoaderITCase.java             |  58 +++++++
 .../jar/CustomInputSpitProgram.java             | 172 -------------------
 .../jar/CustomInputSplitProgram.java            | 172 +++++++++++++++++++
 .../test/classloading/jar/StreamingProgram.java |  63 +++++++
 8 files changed, 363 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bec2287/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 467067d..985c675 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -78,6 +78,13 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 		
 		<dependency>
 			<groupId>org.apache.flink</groupId>
@@ -330,7 +337,7 @@ under the License.
 						<configuration>
 							<archive>
 								<manifest>
-									<mainClass>org.apache.flink.test.classloading.jar.CustomInputSpitProgram</mainClass>
+									<mainClass>org.apache.flink.test.classloading.jar.CustomInputSplitProgram</mainClass>
 								</manifest>
 							</archive>
 							<finalName>customsplit</finalName>
@@ -340,6 +347,25 @@ under the License.
 							</descriptors>
 						</configuration>
 					</execution>
+					<execution>
+						<id>create-streamingclassloader-jar</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.test.classloading.jar.StreamingProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>streamingclassloader</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-streamingclassloader-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 
@@ -369,7 +395,7 @@ under the License.
 						</configuration>
 					</execution>
 					<execution>
-						<id>remove-custominputformat-test-dependencies</id>
+						<id>remove-classloading-test-dependencies</id>
 						<phase>process-test-classes</phase>
 						<goals>
 							<goal>clean</goal>

http://git-wip-us.apache.org/repos/asf/flink/blob/6bec2287/flink-tests/src/test/assembly/test-custominput-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-custominput-assembly.xml b/flink-tests/src/test/assembly/test-custominput-assembly.xml
index 9d4800b..e6f3568 100644
--- a/flink-tests/src/test/assembly/test-custominput-assembly.xml
+++ b/flink-tests/src/test/assembly/test-custominput-assembly.xml
@@ -30,7 +30,8 @@ under the License.
 			<outputDirectory>/</outputDirectory>
 			<!--modify/add include to match your package(s) -->
 			<includes>
-				<include>org/apache/flink/test/classloading/jar/**</include>
+				<include>org/apache/flink/test/classloading/jar/CustomInputSplitProgram.class</include>
+				<include>org/apache/flink/test/classloading/jar/CustomInputSplitProgram$*.class</include>
 			</includes>
 		</fileSet>
 	</fileSets>

http://git-wip-us.apache.org/repos/asf/flink/blob/6bec2287/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml b/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
new file mode 100644
index 0000000..8321b21
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-streamingclassloader-assembly.xml
@@ -0,0 +1,39 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/test/classloading/jar/StreamingProgram.class</include>
+				<include>org/apache/flink/test/classloading/jar/StreamingProgram$*.class</include>
+				<include>org/apache/flink/test/testdata/WordCountData.class</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6bec2287/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
index 24cce08..535273f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
@@ -42,7 +42,7 @@ public class InputSplitClassLoaderITCase {
 			ForkableFlinkMiniCluster testCluster = new ForkableFlinkMiniCluster(config, false);
 			try {
 				int port = testCluster.getJobManagerRPCPort();
-				
+
 				PackagedProgram prog = new PackagedProgram(new File(JAR_FILE),
 						new String[] { JAR_FILE, "localhost", String.valueOf(port) } );
 				prog.invokeInteractiveModeForExecution();

http://git-wip-us.apache.org/repos/asf/flink/blob/6bec2287/flink-tests/src/test/java/org/apache/flink/test/classloading/StreamingClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/StreamingClassLoaderITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/StreamingClassLoaderITCase.java
new file mode 100644
index 0000000..103e126
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/StreamingClassLoaderITCase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+public class StreamingClassLoaderITCase {
+	
+	private static final String JAR_FILE = "target/streamingclassloader-test-jar.jar";
+	
+	@Test
+	public void testStreamingJob() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+			
+			ForkableFlinkMiniCluster testCluster = new ForkableFlinkMiniCluster(config, false);
+			try {
+				int port = testCluster.getJobManagerRPCPort();
+
+				PackagedProgram prog = new PackagedProgram(new File(JAR_FILE),
+						new String[] { JAR_FILE, "localhost", String.valueOf(port) } );
+				prog.invokeInteractiveModeForExecution();
+			}
+			finally {
+				testCluster.shutdown();
+			}
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			Assert.fail(t.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bec2287/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
deleted file mode 100644
index 52cfa02..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.classloading.jar;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-@SuppressWarnings("serial")
-public class CustomInputSpitProgram {
-	
-	public static void main(String[] args) throws Exception {
-		
-		final String jarFile = args[0];
-		final String host = args[1];
-		final int port = Integer.parseInt(args[2]);
-		
-		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
-
-		DataSet<Integer> data = env.createInput(new CustomInputFormat());
-
-		data
-			.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
-				@Override
-				public Tuple2<Integer, Double> map(Integer value) {
-					return new Tuple2<Integer, Double>(value, value * 0.5);
-				}
-			})
-			.output(new DiscardingOutputFormat<Tuple2<Integer,Double>>());
-
-		env.execute();
-	}
-	// --------------------------------------------------------------------------------------------
-	
-	public static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>,
ResultTypeQueryable<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		private Integer value;
-
-		@Override
-		public void configure(Configuration parameters) {}
-
-		@Override
-		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
-			return null;
-		}
-
-		@Override
-		public CustomInputSplit[] createInputSplits(int minNumSplits) {
-			CustomInputSplit[] splits = new CustomInputSplit[minNumSplits];
-			for (int i = 0; i < minNumSplits; i++) {
-				splits[i] = new CustomInputSplit(i);
-			}
-			return splits;
-		}
-
-		@Override
-		public InputSplitAssigner getInputSplitAssigner(CustomInputSplit[] inputSplits) {
-			return new CustomSplitAssigner(inputSplits);
-		}
-
-		@Override
-		public void open(CustomInputSplit split) {
-			this.value = split.getSplitNumber();
-		}
-
-		@Override
-		public boolean reachedEnd() {
-			return this.value == null;
-		}
-
-		@Override
-		public Integer nextRecord(Integer reuse) {
-			Integer val = this.value;
-			this.value = null;
-			return val;
-		}
-
-		@Override
-		public void close() {}
-
-		@Override
-		public TypeInformation<Integer> getProducedType() {
-			return BasicTypeInfo.INT_TYPE_INFO;
-		}
-	}
-
-	public static final class CustomInputSplit implements InputSplit {
-
-		private static final long serialVersionUID = 1L;
-
-		private int splitNumber;
-
-		public CustomInputSplit() {
-			this(-1);
-		}
-
-		public CustomInputSplit(int splitNumber) {
-			this.splitNumber = splitNumber;
-		}
-
-		@Override
-		public int getSplitNumber() {
-			return this.splitNumber;
-		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			out.writeInt(splitNumber);
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-			splitNumber = in.readInt();
-		}
-	}
-
-	public static final class CustomSplitAssigner implements InputSplitAssigner {
-
-		private final List<CustomInputSplit> remainingSplits;
-
-		public CustomSplitAssigner(CustomInputSplit[] splits) {
-			this.remainingSplits = new ArrayList<CustomInputSplit>(Arrays.asList(splits));
-		}
-
-		@Override
-		public InputSplit getNextInputSplit(String host, int taskId) {
-			synchronized (this) {
-				int size = remainingSplits.size();
-				if (size > 0) {
-					return remainingSplits.remove(size - 1);
-				} else {
-					return null;
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bec2287/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
new file mode 100644
index 0000000..a5fe8c9
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+@SuppressWarnings("serial")
+public class CustomInputSplitProgram {
+	
+	public static void main(String[] args) throws Exception {
+		
+		final String jarFile = args[0];
+		final String host = args[1];
+		final int port = Integer.parseInt(args[2]);
+		
+		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+
+		DataSet<Integer> data = env.createInput(new CustomInputFormat());
+
+		data
+			.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
+				@Override
+				public Tuple2<Integer, Double> map(Integer value) {
+					return new Tuple2<Integer, Double>(value, value * 0.5);
+				}
+			})
+			.output(new DiscardingOutputFormat<Tuple2<Integer,Double>>());
+
+		env.execute();
+	}
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>,
ResultTypeQueryable<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private Integer value;
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+			return null;
+		}
+
+		@Override
+		public CustomInputSplit[] createInputSplits(int minNumSplits) {
+			CustomInputSplit[] splits = new CustomInputSplit[minNumSplits];
+			for (int i = 0; i < minNumSplits; i++) {
+				splits[i] = new CustomInputSplit(i);
+			}
+			return splits;
+		}
+
+		@Override
+		public InputSplitAssigner getInputSplitAssigner(CustomInputSplit[] inputSplits) {
+			return new CustomSplitAssigner(inputSplits);
+		}
+
+		@Override
+		public void open(CustomInputSplit split) {
+			this.value = split.getSplitNumber();
+		}
+
+		@Override
+		public boolean reachedEnd() {
+			return this.value == null;
+		}
+
+		@Override
+		public Integer nextRecord(Integer reuse) {
+			Integer val = this.value;
+			this.value = null;
+			return val;
+		}
+
+		@Override
+		public void close() {}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return BasicTypeInfo.INT_TYPE_INFO;
+		}
+	}
+
+	public static final class CustomInputSplit implements InputSplit {
+
+		private static final long serialVersionUID = 1L;
+
+		private int splitNumber;
+
+		public CustomInputSplit() {
+			this(-1);
+		}
+
+		public CustomInputSplit(int splitNumber) {
+			this.splitNumber = splitNumber;
+		}
+
+		@Override
+		public int getSplitNumber() {
+			return this.splitNumber;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeInt(splitNumber);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			splitNumber = in.readInt();
+		}
+	}
+
+	public static final class CustomSplitAssigner implements InputSplitAssigner {
+
+		private final List<CustomInputSplit> remainingSplits;
+
+		public CustomSplitAssigner(CustomInputSplit[] splits) {
+			this.remainingSplits = new ArrayList<CustomInputSplit>(Arrays.asList(splits));
+		}
+
+		@Override
+		public InputSplit getNextInputSplit(String host, int taskId) {
+			synchronized (this) {
+				int size = remainingSplits.size();
+				if (size > 0) {
+					return remainingSplits.remove(size - 1);
+				} else {
+					return null;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bec2287/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
new file mode 100644
index 0000000..6799d9b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/StreamingProgram.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.util.Collector;
+
+import java.util.StringTokenizer;
+
+@SuppressWarnings("serial")
+public class StreamingProgram {
+	
+	public static void main(String[] args) throws Exception {
+		
+		final String jarFile = args[0];
+		final String host = args[1];
+		final int port = Integer.parseInt(args[2]);
+		
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,
port, jarFile);
+
+		DataStream<String> text = env.fromElements(WordCountData.TEXT);
+
+		DataStream<Tuple2<String, Integer>> counts =
+				text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
+					@Override
+					public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
throws Exception {
+						StringTokenizer tokenizer = new StringTokenizer(value);
+						while (tokenizer.hasMoreTokens()){
+							out.collect(new Tuple2<String, Integer>(tokenizer.nextToken(), 1));
+						}
+					}
+				}).groupBy(0).sum(1);
+
+		counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
+			@Override
+			public void invoke(Tuple2<String, Integer> value) throws Exception {
+			}
+		});
+
+		env.execute();
+	}
+}


Mime
View raw message