flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/2] flink git commit: [FLINK-1660] [streaming] [tests] Fix MultiTriggerPolicyTest race
Date Tue, 14 Apr 2015 13:02:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master f50d75411 -> e5a3b95a2


[FLINK-1660] [streaming] [tests] Fix MultiTriggerPolicyTest race

Closes #590


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5a3b95a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5a3b95a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5a3b95a

Branch: refs/heads/master
Commit: e5a3b95a262a0498f9d6dc7d47495c95719c5632
Parents: 54d5662
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Apr 13 12:16:16 2015 +0200
Committer: mbalassi <mbalassi@apache.org>
Committed: Tue Apr 14 15:01:22 2015 +0200

----------------------------------------------------------------------
 .../flink-streaming-core/pom.xml                | 12 +++-
 .../policy/MultiTriggerPolicyTest.java          | 64 +++++++++++---------
 2 files changed, 44 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5a3b95a/flink-staging/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/pom.xml b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
index a60bd01..bc6c55c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
@@ -48,12 +48,18 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
-        
-	        <dependency>
+
+		<dependency>
 			<groupId>org.apache.sling</groupId>
 			<artifactId>org.apache.sling.commons.json</artifactId>
 			<version>2.0.6</version>
-        	</dependency>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/e5a3b95a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
index 9964cd8..4448b59 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
@@ -17,16 +17,22 @@
 
 package org.apache.flink.streaming.api.windowing.policy;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class MultiTriggerPolicyTest {
 
@@ -135,7 +141,7 @@ public class MultiTriggerPolicyTest {
 	 * correctly.
 	 */
 	@Test
-	public void testActiveTriggerRunnables() {
+	public void testActiveTriggerRunnables() throws InterruptedException {
 		TriggerPolicy<Integer> firstPolicy = new ActiveTriggerWithRunnable(1);
 		TriggerPolicy<Integer> secondPolicy = new ActiveTriggerWithRunnable(2);
 		TriggerPolicy<Integer> thirdPolicy = new ActiveTriggerWithRunnable(3);
@@ -143,7 +149,7 @@ public class MultiTriggerPolicyTest {
 		ActiveTriggerPolicy<Integer> multiTrigger = new MultiTriggerPolicy<Integer>(firstPolicy,
 				secondPolicy, thirdPolicy);
 
-		MyCallbackClass cb = new MyCallbackClass();
+		MyCallbackClass cb = new MyCallbackClass(3);
 		Runnable runnable = multiTrigger.createActiveTriggerRunnable(cb);
 		new Thread(runnable).start();
 
@@ -168,7 +174,7 @@ public class MultiTriggerPolicyTest {
 	@SuppressWarnings("serial")
 	private class ActiveTriggerWithRunnable implements ActiveTriggerPolicy<Integer> {
 
-		int id;
+		private final int id;
 
 		public ActiveTriggerWithRunnable(int id) {
 			this.id = id;
@@ -203,37 +209,37 @@ public class MultiTriggerPolicyTest {
 	 */
 	private class MyCallbackClass implements ActiveTriggerCallback {
 
-		List<Integer> received = new LinkedList<Integer>();
+		private final Set<Integer> received = Sets
+				.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
+
+		private final CountDownLatch sync;
+
+		public MyCallbackClass(int numberOfExpectedElements) {
+			checkArgument(numberOfExpectedElements >= 0);
+			this.sync = new CountDownLatch(numberOfExpectedElements);
+		}
 
 		@Override
 		public void sendFakeElement(Object datapoint) {
 			received.add((Integer) datapoint);
+
+			sync.countDown();
 		}
 
-		public boolean check(int timeout, int... ids) {
-			int totalTime = 0;
+		public boolean check(int timeout, int... expectedIds) throws InterruptedException {
+			// Wait for all elements
+			sync.await(timeout, TimeUnit.MILLISECONDS);
 
-			while (totalTime <= timeout) {
-				boolean result = true;
-				for (int id : ids) {
-					if (!received.contains(id)) {
-						result = false;
-					}
-				}
+			// Check received all expected ids
+			assertEquals(expectedIds.length, received.size());
 
-				if (result) {
-					return true;
-				} else {
-					try {
-						Thread.sleep(1000);
-						totalTime += 1000;
-					} catch (InterruptedException e) {
-						// ignore it here
-					}
+			for (int id : expectedIds) {
+				if (!received.contains(id)) {
+					return false;
 				}
 			}
-			return false;
-		}
 
+			return true;
+		}
 	}
 }
\ No newline at end of file


Mime
View raw message