flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [59/82] [abbrv] incubator-flink git commit: Adapted test cases to actor model after rebasing.
Date Thu, 18 Dec 2014 18:45:55 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 7722d8a..b73f961 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -36,17 +36,26 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import akka.actor.ActorRef;
+import akka.dispatch.Futures;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.Assert;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 public abstract class AbstractTestBase {
 	protected static final int MINIMUM_HEAP_SIZE_MB = 192;
@@ -59,7 +68,7 @@ public abstract class AbstractTestBase {
 
 	protected final Configuration config;
 	
-	protected LocalFlinkMiniCluster executor;
+	protected TestingCluster executor;
 
 	private final List<File> tempFiles;
 
@@ -67,10 +76,15 @@ public abstract class AbstractTestBase {
 
 	protected int numTaskManagers = DEFAULT_NUM_TASK_MANAGERS;
 
+	private final FiniteDuration timeout;
+
 	public AbstractTestBase(Configuration config) {
 		verifyJvmOptions();
 		this.config = config;
 		this.tempFiles = new ArrayList<File>();
+
+		timeout = new FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
+				ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS);
 	}
 
 	private void verifyJvmOptions() {
@@ -90,7 +104,7 @@ public abstract class AbstractTestBase {
 		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots);
 		config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers);
-		this.executor = new LocalFlinkMiniCluster(config);
+		this.executor = new TestingCluster(config);
 	}
 	
 	public void stopCluster() throws Exception {
@@ -101,13 +115,23 @@ public abstract class AbstractTestBase {
 			int numActiveConnections = 0;
 
 			{
-				TaskManager[] tms = executor.getTaskManagers();
+				List<ActorRef> tms = executor.getTaskManagersAsJava();
+				List<Future<Object>> responseFutures = new ArrayList<Future<Object>>();
 
-				if (tms != null) {
-					for (TaskManager tm : tms) {
-						numUnreleasedBCVars += tm.getBroadcastVariableManager().getNumberOfVariablesWithReferences();
-						numActiveConnections += tm.getChannelManager().getNetworkConnectionManager().getNumberOfActiveConnections();
-					}
+				for(ActorRef tm: tms){
+					responseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
+							.RequestBroadcastVariablesWithReferences$.MODULE$, new Timeout
+							(timeout)));
+				}
+
+				Future<Iterable<Object>> futureResponses = Futures.sequence(
+						responseFutures, AkkaUtils.globalExecutionContext());
+
+				Iterable<Object> responses = Await.result(futureResponses, timeout);
+
+				for(Object response: responses) {
+					numUnreleasedBCVars += ((TestingTaskManagerMessages
+							.ResponseBroadcastVariablesWithReferences) response).number();
 				}
 			}
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
index ca3d294..83dd73b 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/JavaProgramTestBase.java
@@ -31,7 +31,7 @@ import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.flink.api.java.CollectionEnvironment;
@@ -194,12 +194,12 @@ public abstract class JavaProgramTestBase extends AbstractTestBase {
 	
 	private static final class TestEnvironment extends ExecutionEnvironment {
 
-		private final LocalFlinkMiniCluster executor;
+		private final FlinkMiniCluster executor;
 
 		private JobExecutionResult latestResult;
 		
 		
-		private TestEnvironment(LocalFlinkMiniCluster executor, int degreeOfParallelism) {
+		private TestEnvironment(FlinkMiniCluster executor, int degreeOfParallelism) {
 			this.executor = executor;
 			setDegreeOfParallelism(degreeOfParallelism);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/bd4ee47b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
index 0d27f8d..e8b716b 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/util/FailingTestBase.java
@@ -19,7 +19,7 @@
 package org.apache.flink.test.util;
 
 import akka.actor.ActorRef;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
 import org.junit.Assert;
 
 import org.apache.flink.runtime.client.JobClient;
@@ -120,7 +120,7 @@ public abstract class FailingTestBase extends RecordAPITestBase {
 		// reference to the timeout thread
 		private final Thread timeoutThread;
 		// cluster to submit the job to.
-		private final LocalFlinkMiniCluster executor;
+		private final FlinkMiniCluster executor;
 		// job graph of the failing job (submitted first)
 		private final JobGraph failingJob;
 		// job graph of the working job (submitted after return from failing job)
@@ -129,7 +129,7 @@ public abstract class FailingTestBase extends RecordAPITestBase {
 		private volatile Exception error;
 		
 
-		public SubmissionThread(Thread timeoutThread, LocalFlinkMiniCluster executor, JobGraph
failingJob,
+		public SubmissionThread(Thread timeoutThread, FlinkMiniCluster executor, JobGraph failingJob,
 								JobGraph job) {
 			this.timeoutThread = timeoutThread;
 			this.executor = executor;


Mime
View raw message