flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [01/27] flink git commit: [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
Date Wed, 31 Aug 2016 17:28:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7cd9bb5f1 -> bdf9f86c5


http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index 5d99de4..3b98d33 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -78,16 +78,20 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 		final int numKeys = 2;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
 		DataStream<Tuple2<Integer, Integer>> sourceStream = env.addSource(new TupleSource(numElements,
numKeys));
 
 		SplitStream<Tuple2<Integer, Integer>> splittedResult = sourceStream
 			.keyBy(0)
 			.fold(0, new FoldFunction<Tuple2<Integer, Integer>, Integer>() {
+				private static final long serialVersionUID = 4875723041825726082L;
+
 				@Override
 				public Integer fold(Integer accumulator, Tuple2<Integer, Integer> value) throws
Exception {
 					return accumulator + value.f1;
 				}
 			}).map(new RichMapFunction<Integer, Tuple2<Integer, Integer>>() {
+				private static final long serialVersionUID = 8538355101606319744L;
 				int key = -1;
 				@Override
 				public Tuple2<Integer, Integer> map(Integer value) throws Exception {
@@ -97,6 +101,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 					return new Tuple2<>(key, value);
 				}
 			}).split(new OutputSelector<Tuple2<Integer, Integer>>() {
+				private static final long serialVersionUID = -8439325199163362470L;
+
 				@Override
 				public Iterable<String> select(Tuple2<Integer, Integer> value) {
 					List<String> output = new ArrayList<>();
@@ -107,6 +113,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 			});
 
 		splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>()
{
+			private static final long serialVersionUID = 2114608668010092995L;
+
 			@Override
 			public Integer map(Tuple2<Integer, Integer> value) throws Exception {
 				return value.f1;
@@ -114,6 +122,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 		}).writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE);
 
 		splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>()
{
+			private static final long serialVersionUID = 5631104389744681308L;
+
 			@Override
 			public Integer map(Tuple2<Integer, Integer> value) throws Exception {
 				return value.f1;
@@ -149,6 +159,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 		final int numElements = 10;
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
 		DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
 
 		input
@@ -156,12 +167,16 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 			.fold(
 				new NonSerializable(42),
 				new FoldFunction<Tuple2<Integer, NonSerializable>, NonSerializable>() {
+					private static final long serialVersionUID = 2705497830143608897L;
+
 					@Override
 					public NonSerializable fold(NonSerializable accumulator, Tuple2<Integer, NonSerializable>
value) throws Exception {
 						return new NonSerializable(accumulator.value + value.f1.value);
 					}
 			})
 			.map(new MapFunction<NonSerializable, Integer>() {
+				private static final long serialVersionUID = 6906984044674568945L;
+
 				@Override
 				public Integer map(NonSerializable value) throws Exception {
 					return value.value;
@@ -192,6 +207,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 	}
 
 	private static class NonSerializableTupleSource implements SourceFunction<Tuple2<Integer,
NonSerializable>> {
+		private static final long serialVersionUID = 3949171986015451520L;
 		private final int numElements;
 
 		public NonSerializableTupleSource(int numElements) {
@@ -212,6 +228,7 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
 
 	private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>>
{
 
+		private static final long serialVersionUID = -8110466235852024821L;
 		private final int numElements;
 		private final int numKeys;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
index c345b37..cc8b699 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/DataStreamPojoITCase.java
@@ -59,6 +59,7 @@ public class DataStreamPojoITCase extends StreamingMultipleProgramsTestBase
{
 				.sum("sum")
 				.keyBy("aaa", "abc", "wxyz")
 				.flatMap(new FlatMapFunction<Data, Data>() {
+					private static final long serialVersionUID = 788865239171396315L;
 					Data[] first = new Data[3];
 					@Override
 					public void flatMap(Data value, Collector<Data> out) throws Exception {
@@ -105,6 +106,7 @@ public class DataStreamPojoITCase extends StreamingMultipleProgramsTestBase
{
 				.sum("sum")
 				.keyBy("aaa", "stats.count")
 				.flatMap(new FlatMapFunction<Data, Data>() {
+					private static final long serialVersionUID = -3678267280397950258L;
 					Data[] first = new Data[3];
 					@Override
 					public void flatMap(Data value, Collector<Data> out) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index d69c140..d693aaa 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -46,6 +46,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/flink/blob/ec975aab/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index 032c8fe..fc90994 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.apache.flink.test.util.TestBaseUtils;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -56,7 +57,7 @@ import static org.junit.Assert.fail;
 
 import static org.apache.flink.test.util.TestBaseUtils.getFromHTTP;
 
-public class WebFrontendITCase {
+public class WebFrontendITCase extends TestLogger {
 
 	private static final int NUM_TASK_MANAGERS = 2;
 	private static final int NUM_SLOTS = 4;


Mime
View raw message