flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [49/82] [abbrv] incubator-flink git commit: Removed dead instance cleanup from InstanceManager so that Akka's watch mechanism is the current mean to detect dead instances.
Date Thu, 18 Dec 2014 18:45:45 GMT
Removed dead instance cleanup from InstanceManager so that Akka's watch mechanism is the current
mean to detect dead instances.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/8d414d7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/8d414d7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/8d414d7e

Branch: refs/heads/master
Commit: 8d414d7eb534bf5defa4aadd9b4f0a12a0be7ca8
Parents: 8eadd3e
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Nov 17 18:33:45 2014 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Dec 18 18:58:31 2014 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java |   1 +
 .../flink/configuration/ConfigConstants.java    |  10 +-
 flink-runtime/pom.xml                           |   6 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   3 +-
 .../apache/flink/runtime/instance/Instance.java |  27 +---
 .../flink/runtime/instance/InstanceManager.java |  85 +------------
 .../org/apache/flink/runtime/net/NetUtils.java  |   8 +-
 .../runtime/operators/RegularPactTask.java      |   1 -
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 125 ++++++++++++++-----
 .../flink/runtime/akka/KryoInitializer.scala    |  26 ++++
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  13 +-
 .../runtime/instance/InstanceManagerTest.java   |  79 ------------
 .../scheduler/SchedulerIsolatedTasksTest.java   |   6 -
 .../apache/flink/api/scala/ClosureCleaner.scala |   2 +-
 .../test/cancelling/CancellingTestBase.java     |   2 -
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../PartitionOperatorTranslationTest.scala      |   2 +-
 .../scala/functions/ClosureCleanerITCase.scala  |   2 +-
 .../misc/MassiveCaseClassSortingITCase.scala    |   2 +-
 .../CoGroupCustomPartitioningTest.scala         |   2 +-
 .../CoGroupGroupSortTranslationTest.scala       |   2 +-
 ...tomPartitioningGroupingKeySelectorTest.scala |   2 +-
 .../CustomPartitioningGroupingPojoTest.scala    |   2 +-
 .../CustomPartitioningGroupingTupleTest.scala   |   2 +-
 .../translation/CustomPartitioningTest.scala    |   2 +-
 .../JoinCustomPartitioningTest.scala            |   2 +-
 .../translation/PartitioningTestClasses.scala   |  12 +-
 .../scala/runtime/CaseClassComparatorTest.scala |   2 +-
 pom.xml                                         |   6 +
 30 files changed, 180 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 45848c2..c4e71ef 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -54,6 +54,7 @@ import com.google.common.base.Preconditions;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index f0ab180..ab1ed78 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -578,17 +578,17 @@ public final class ConfigConstants {
 	
 	// ------------------------------ Akka Values ------------------------------
 
-	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 ms";
+	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "5000 ms";
 
-	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "10 s";
+	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_PAUSE = "100 s";
 
 	public static double DEFAULT_AKKA_TRANSPORT_THRESHOLD = 300.0;
 
-	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "1000 ms";
+	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_INTERVAL = "5000 ms";
 
-	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "10 s";
+	public static String DEFAULT_AKKA_WATCH_HEARTBEAT_PAUSE = "100 s";
 
-	public static double DEFAULT_AKKA_WATCH_THRESHOLD = 10.0;
+	public static double DEFAULT_AKKA_WATCH_THRESHOLD = 300.0;
 
 	public static String DEFAULT_AKKA_TCP_TIMEOUT = "15 s";
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index f04475c..b7edf7a 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -123,6 +123,11 @@ under the License.
 		</dependency>
 
 		<dependency>
+			<groupId>com.github.romix.akka</groupId>
+			<artifactId>akka-kryo-serialization_2.10</artifactId>
+		</dependency>
+
+		<dependency>
 			<groupId>org.scalatest</groupId>
 			<artifactId>scalatest_2.10</artifactId>
 		</dependency>
@@ -289,7 +294,6 @@ under the License.
 					<systemPropertyVariables>
 						<log.level>WARN</log.level>
 					</systemPropertyVariables>
-					<reuseForks>false</reuseForks>
 				</configuration>
 			</plugin>
 			<plugin>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 8faa235..7d38eac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -58,7 +59,7 @@ import org.apache.flink.util.ExceptionUtils;
 import static akka.dispatch.Futures.future;
 
 
-public class ExecutionGraph {
+public class ExecutionGraph implements Serializable {
 
 	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER
=
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 18d3212..aaa276d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -116,32 +116,7 @@ public class Instance {
 	public boolean isAlive() {
 		return !isDead;
 	}
-	
-	public void stopInstance() {
-		try {
-			final TaskOperationProtocol tmProxy = this.getTaskManagerProxy();
-			// start a thread for stopping the TM to avoid infinitive blocking.
-			Runnable r = new Runnable() {
-				@Override
-				public void run() {
-					try {
-						tmProxy.killTaskManager();
-					} catch (IOException e) {
-						if (Log.isDebugEnabled()) {
-							Log.debug("Error while stopping TaskManager", e);
-						}
-					}
-				}
-			};
-			Thread t = new Thread(r);
-			t.setDaemon(true); // do not prevent the JVM from stopping
-			t.start();
-		} catch (Exception e) {
-			if (Log.isDebugEnabled()) {
-				Log.debug("Error while stopping TaskManager", e);
-			}
-		}
-	}
+
 	public void markDead() {
 		if (isDead) {
 			return;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index 10c89e4..3ce3ac7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -22,12 +22,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
 
 import akka.actor.ActorRef;
 import org.apache.flink.configuration.ConfigConstants;
@@ -97,23 +94,11 @@ public class InstanceManager {
 		this.registeredHostsByConnection = new HashMap<ActorRef, Instance>();
 		this.deadHosts = new HashSet<ActorRef>();
 		this.heartbeatTimeout = heartbeatTimeout;
-
-		new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
 	}
 	
 	public long getHeartbeatTimeout() {
 		return heartbeatTimeout;
 	}
-	
-	/**
-	 * This method is only used by the Flink YARN client to self-destruct a Flink cluster
-	 * by stopping the JVMs of the TaskManagers.
-	 */
-	public void killTaskManagers() {
-		for (Instance i : this.registeredHostsById.values()) {
-			i.stopInstance();
-		}
-	}
 
 	public void shutdown() {
 		synchronized (this.lock) {
@@ -122,8 +107,6 @@ public class InstanceManager {
 			}
 			this.shutdown = true;
 
-			this.cleanupStaleMachines.cancel();
-
 			for (Instance i : this.registeredHostsById.values()) {
 				i.markDead();
 			}
@@ -213,7 +196,7 @@ public class InstanceManager {
 
 		if(host != null){
 			registeredHostsByConnection.remove(taskManager);
-			registeredHostsById.remove(taskManager);
+			registeredHostsById.remove(host.getId());
 			deadHosts.add(taskManager);
 
 			host.markDead();
@@ -221,6 +204,10 @@ public class InstanceManager {
 			totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
 
 			notifyDeadInstance(host);
+
+			LOG.info("Unregistered task manager " + taskManager.path().address() + ". Number of "
+
+					"registered task managers " + getNumberOfRegisteredTaskManagers() + ". Number" +
+					" of available slots " + getTotalNumberOfSlots() + ".");
 		}
 	}
 
@@ -272,70 +259,10 @@ public class InstanceManager {
 			for (InstanceListener listener : this.instanceListeners) {
 				try {
 					listener.instanceDied(instance);
-				}
-				catch (Throwable t) {
+				} catch (Throwable t) {
 					LOG.error("Notification of dead instance failed.", t);
 				}
 			}
 		}
 	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private void checkForDeadInstances() {
-		final long now = System.currentTimeMillis();
-		final long timeout = InstanceManager.this.heartbeatTimeout;
-		
-		synchronized (InstanceManager.this.lock) {
-			if (InstanceManager.this.shutdown) {
-				return;
-			}
-
-			final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
-			
-			// check all hosts whether they did not send heart-beat messages.
-			while (entries.hasNext()) {
-				
-				final Map.Entry<InstanceID, Instance> entry = entries.next();
-				final Instance host = entry.getValue();
-				
-				if (!host.isStillAlive(now, timeout)) {
-					
-					// remove from the living
-					entries.remove();
-					registeredHostsByConnection.remove(host.getTaskManager());
-
-					// add to the dead
-					deadHosts.add(host.getTaskManager());
-					
-					host.markDead();
-					
-					totalNumberOfAliveTaskSlots -= host.getTotalNumberOfSlots();
-					
-					LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs
- marking as dead. Current number of registered hosts is %d.",
-							host.getId(), host.getPath(), heartbeatTimeout, registeredHostsById.size()));
-					
-					// report to all listeners
-					notifyDeadInstance(host);
-				}
-			}
-		}
-	}
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Periodic task that checks whether hosts have not sent their heart-beat
-	 * messages and purges the hosts in this case.
-	 */
-	private final TimerTask cleanupStaleMachines = new TimerTask() {
-		@Override
-		public void run() {
-			try {
-				checkForDeadInstances();
-			}
-			catch (Throwable t) {
-				LOG.error("Checking for dead instances failed.", t);
-			}
-		}
-	};
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index 2cd929b..2795158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -367,6 +367,9 @@ public class NetUtils {
 							}
 							break;
 						case HEURISTIC:
+							LOG.debug("ResolveAddress using heuristic strategy for " + i + " with" +
+									"isLinkLocalAddress:" + i.isLinkLocalAddress() +" " +
+									"isLoopbackAddress:" + i.isLoopbackAddress() + ".");
 							if(!i.isLinkLocalAddress() && !i.isLoopbackAddress() && i instanceof
Inet4Address){
 								LOG.warn("Hostname " + InetAddress.getLocalHost().getHostName() + " resolves to "
+
 										"loopback address. Using instead " + i.getHostAddress() + " on network " +
@@ -389,13 +392,16 @@ public class NetUtils {
 					break;
 				case SLOW_CONNECT:
 					if(!InetAddress.getLocalHost().isLoopbackAddress()){
+						LOG.info("Heuristically taking " + InetAddress.getLocalHost() + " as own " +
+								"IP address.");
 						return InetAddress.getLocalHost();
 					}else {
 						strategy = AddressDetectionState.HEURISTIC;
 						break;
 					}
 				case HEURISTIC:
-					throw new RuntimeException("The TaskManager is unable to connect to the JobManager (Address:
'"+jobManagerAddress+"').");
+					throw new RuntimeException("Unable to resolve own inet address by connecting " +
+							"to address (" + jobManagerAddress + ").");
 			}
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("Defaulting to detection strategy " + strategy);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 5f520e3..9ea4a74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -1303,7 +1303,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable
i
 					final TypeComparator<T> comparator = compFactory.createComparator();
 					oe = new OutputEmitter<T>(strategy, comparator, partitioner, dataDist);
 				}
-
 				writers.add(new RecordWriter<SerializationDelegate<T>>(task, oe));
 			}
 			if (eventualOutputs != null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index f931497..168dccb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -78,42 +78,105 @@ object AkkaUtils {
     val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL,
       ConfigConstants.DEFAULT_AKKA_LOG_LEVEL)
 
-    val configString = s"""akka.remote.transport-failure-detector.heartbeat-interval =
-                       $transportHeartbeatInterval
-       |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $transportHeartbeatPause
-       |akka.remote.transport-failure-detector.threshold = $transportThreshold
-       |akka.remote.watch-failure-detector.heartbeat-interval = $watchHeartbeatInterval
-       |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $watchHeartbeatPause
-       |akka.remote.wathc-failure-detector.threshold = $watchThreshold
-       |akka.remote.netty.tcp.hostname = $host
-       |akka.remote.netty.tcp.port = $port
-       |akka.remote.netty.tcp.connection-timeout = $akkaTCPTimeout
-       |akka.remote.netty.tcp.maximum-frame-size = $akkaFramesize
-       |akka.actor.default-dispatcher.throughput = $akkaThroughput
-       |akka.remote.log-remote-lifecycle-events = $logLifecycleEvents
-       |akka.log-dead-letters = $logLifecycleEvents
-       |akka.log-dead-letters-during-shutdown = $logLifecycleEvents
-       |akka.loglevel = "$logLevel"
-       |akka.stdout-loglevel = "$logLevel"
-     """.stripMargin
+    val configString =
+      s"""
+         |akka {
+         |  loglevel = "$logLevel"
+         |  stdout-loglevel = "$logLevel"
+         |
+         |  log-dead-letters = $logLifecycleEvents
+         |  log-dead-letters-during-shutdown = $logLifecycleEvents
+         |
+         |  extensions = ["com.romix.akka.serialization.kryo.KryoSerializationExtension$$"]
+         |
+         |  remote {
+         |    transport-failure-detector{
+         |      acceptable-heartbeat-pause = $transportHeartbeatPause
+         |      threshold = $transportThreshold
+         |      heartbeat-interval = $transportHeartbeatInterval
+         |    }
+         |
+         |    watch-failure-detector{
+         |      heartbeat-interval = $watchHeartbeatInterval
+         |      acceptable-heartbeat-pause = $watchHeartbeatPause
+         |      threshold = $watchThreshold
+         |    }
+         |
+         |    netty{
+         |      tcp{
+         |        hostname = $host
+         |        port = $port
+         |        connection-timeout = $akkaTCPTimeout
+         |        maximum-frame-size = $akkaFramesize
+         |      }
+         |    }
+         |
+         |    log-remote-lifecycle-events = $logLifecycleEvents
+         |
+         |  }
+         |
+         |  actor{
+         |    default-dispatcher{
+         |      throughput = $akkaThroughput
+         |    }
+         |
+         |    kryo{
+         |      type = "nograph"
+         |      idstrategy = "default"
+         |      serializer-pool-size = 16
+         |      buffer-size = 4096
+         |      max-buffer-size = -1
+         |      use-manifests = false
+         |      compression = off
+         |      implicit-registration-logging = true
+         |      kryo-trace = true
+         |      kryo-custom-serializer-init = "org.apache.flink.runtime.akka.KryoInitializer"
+         |    }
+         |
+         |    serialize-messages = on
+         |
+         |    serializers{
+         |      kryo = "com.romix.akka.serialization.kryo.KryoSerializer"
+         |    }
+         |
+         |    serialization-bindings {
+         |    }
+         |  }
+         |}
+       """.stripMargin
 
     getDefaultActorSystemConfigString + configString
   }
 
   def getDefaultActorSystemConfigString: String = {
-    s"""akka.daemonic = on
-      |akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
-      |akka.loglevel = "WARNING"
-      |akka.logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
-      |akka.stdout-loglevel = "WARNING"
-      |akka.jvm-exit-on-fatal-error = off
-      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-      |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
-      |akka.remote.netty.tcp.tcp-nodelay = on
-      |akka.log-config-on-start = off
-      |akka.remote.netty.tcp.port = 0
-      |akka.remote.netty.tcp.maximum-frame-size = 1MB
-    """.stripMargin
+    s"""
+       |akka {
+       |  daemonic = on
+       |
+       |  loggers = ["akka.event.slf4j.Slf4jLogger"]
+       |  loglevel = "WARNING"
+       |  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
+       |  stdout-loglevel = "WARNING"
+       |  jvm-exit-on-fatal-error = off
+       |  log-config-on-start = off
+       |
+       |  actor {
+       |    provider = "akka.remote.RemoteActorRefProvider"
+       |  }
+       |
+       |  remote{
+       |    netty{
+       |      tcp{
+       |        transport-class = "akka.remote.transport.netty.NettyTransport"
+       |        tcp-nodelay = on
+       |
+       |        port = 0
+       |        maximum-frame-size = 1MB
+       |      }
+       |    }
+       |  }
+       |}
+     """.stripMargin
   }
 
   def getDefaultActorSystemConfig = {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
new file mode 100644
index 0000000..5f9854b
--- /dev/null
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/KryoInitializer.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka
+
+import com.esotericsoftware.kryo.Kryo
+
+class KryoInitializer {
+  def cystomize(kryo: Kryo): Unit = {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index a72c685..a18240e 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -108,7 +108,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
         hardwareInformation, numberOfSlots)
 
       // to be notified when the taskManager is no longer reachable
-      context.watch(taskManager);
+//      context.watch(taskManager);
 
       taskManager ! AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)
     }
@@ -381,7 +381,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
     case Terminated(taskManager) => {
       log.info(s"Task manager ${taskManager.path} terminated.")
       instanceManager.unregisterTaskManager(taskManager)
-      context.unwatch(taskManager)
+//      context.unwatch(taskManager)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a145689..261d50a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -79,7 +79,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
   val REGISTRATION_DELAY = 0 seconds
   val REGISTRATION_INTERVAL = 10 seconds
   val MAX_REGISTRATION_ATTEMPTS = 10
-  val HEARTBEAT_INTERVAL = 1000 millisecond
+  val HEARTBEAT_INTERVAL = 5000 millisecond
 
   TaskManager.checkTempDirs(tmpDirPaths)
   val ioManager = new IOManagerAsync(tmpDirPaths)
@@ -185,7 +185,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
         currentJobManager = sender()
         instanceID = id
 
-        context.watch(currentJobManager)
+//        context.watch(currentJobManager)
 
         log.info(s"TaskManager successfully registered at JobManager ${
           currentJobManager.path
@@ -232,9 +232,18 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo, val jobManagerAkka
       val taskIndex = tdd.getIndexInSubtaskGroup
       val numSubtasks = tdd.getCurrentNumberOfSubtasks
       var jarsRegistered = false
+      var startRegisteringTask = 0L
 
       try {
+        if(log.isDebugEnabled){
+          startRegisteringTask = System.currentTimeMillis()
+        }
         libraryCacheManager.registerTask(jobID, executionID, tdd.getRequiredJarFiles());
+
+        if(log.isDebugEnabled){
+          log.debug(s"Register task ${executionID} took ${(System.currentTimeMillis() -
+            startRegisteringTask)/1000.0}s")
+        }
         jarsRegistered = true
 
         val userCodeClassLoader = libraryCacheManager.getClassLoader(jobID)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index 1f63588..8a89503 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -235,83 +235,4 @@ public class InstanceManagerTest{
 			Assert.fail("Test erroneous: " + e.getMessage());
 		}
 	}
-
-	/**
-	 * This test checks the clean-up routines of the cluster manager.
-	 */
-	@Test
-	public void testCleanUp() {
-		try {
-			InstanceManager cm = new InstanceManager(200, 100);
-
-			HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
-			InetAddress address = InetAddress.getByName("127.0.0.1");
-			InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, 20000);
-			InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, 20001);
-
-			JavaTestKit probe1 = new JavaTestKit(system);
-			JavaTestKit probe2 = new JavaTestKit(system);
-			// register two instances
-			InstanceID i1 = cm.registerTaskManager(probe1.getRef(), ici1, resources, 1);
-			InstanceID i2 = cm.registerTaskManager(probe2.getRef(), ici2, resources, 1);
-
-			assertNotNull(i1);
-			assertNotNull(i2);
-			
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-
-			// report a few heatbeats for both of the machines (each 50 msecs)...
-			for (int i = 0; i < 8; i++) {
-				CommonTestUtils.sleepUninterruptibly(50);
-				
-				assertTrue(cm.reportHeartBeat(i1));
-				assertTrue(cm.reportHeartBeat(i2));
-			}
-			
-			// all should be alive
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-
-			// report a few heatbeats for both only one machine
-			for (int i = 0; i < 8; i++) {
-				CommonTestUtils.sleepUninterruptibly(50);
-				
-				assertTrue(cm.reportHeartBeat(i1));
-			}
-			
-			// we should have lost one TM by now
-			assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(1, cm.getTotalNumberOfSlots());
-			
-			// if the lost TM reports, it should not be accepted
-			assertFalse(cm.reportHeartBeat(i2));
-			
-			// allow the lost TM to re-register itself
-			i2 = cm.registerTaskManager(probe2.getRef(), ici2, resources, 1);
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-			
-			// report a few heatbeats for both of the machines (each 50 msecs)...
-			for (int i = 0; i < 8; i++) {
-				CommonTestUtils.sleepUninterruptibly(50);
-				
-				assertTrue(cm.reportHeartBeat(i1));
-				assertTrue(cm.reportHeartBeat(i2));
-			}
-			
-			// all should be alive
-			assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
-			assertEquals(2, cm.getTotalNumberOfSlots());
-
-			
-			cm.shutdown();
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			Assert.fail("Test erroneous: " + e.getMessage());
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 240cdac..9418d77 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -38,14 +38,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.runtime.instance.AllocatedSlot;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
 
 /**
  * Tests for the {@link Scheduler} when scheduling individual tasks.
@@ -284,9 +281,6 @@ public class SchedulerIsolatedTasksTest {
 			// the slots should all be different
 			assertTrue(areAllDistinct(slotsAfter.toArray()));
 			
-			executor.shutdown();
-			executor.awaitTermination(30, TimeUnit.SECONDS);
-			
 			assertEquals(totalSlots, scheduler.getNumberOfAvailableSlots());
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
index a3c564a..9740c82 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 303ee3d..63ca29d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -121,8 +121,6 @@ public abstract class CancellingTestBase {
 			actorSystem.scheduler().scheduleOnce(new FiniteDuration(msecsTillCanceling,
 							TimeUnit.MILLISECONDS), client, new JobManagerMessages.CancelJob(jobGraph.getJobID()),
 					actorSystem.dispatcher(), ActorRef.noSender());
-						case RESTARTING:
-							throw new IllegalStateException("Job restarted");
 
 			try {
 				Await.result(result, AkkaUtils.DEFAULT_TIMEOUT());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/log4j-test.properties b/flink-tests/src/test/resources/log4j-test.properties
index 0b686e5..2c2d4ff 100644
--- a/flink-tests/src/test/resources/log4j-test.properties
+++ b/flink-tests/src/test/resources/log4j-test.properties
@@ -17,7 +17,7 @@
 ################################################################################
 
 # Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
+log4j.rootLogger=DEBUG, A1
 
 # A1 is set to be a ConsoleAppender.
 log4j.appender.A1=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
index 1e44413..a83d728 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/compiler/PartitionOperatorTranslationTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index a063957..0d6b763 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
index d09fe60..a34c7d8 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
index 1c6afba..bd254fe 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupCustomPartitioningTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
index 7304310..9535173 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CoGroupGroupSortTranslationTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
index 17ecc3f..93e3593 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingKeySelectorTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
index 8ffba8e..04a6285 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingPojoTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
index b5f266f..e5b6c5f 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningGroupingTupleTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
index d4e438f..1dcf181 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/CustomPartitioningTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
index 8cb49b8..d83d3be 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/JoinCustomPartitioningTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
index bcf1869..2a25be4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/translation/PartitioningTestClasses.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,7 @@
 
 package org.apache.flink.api.scala.operators.translation
 
-import org.junit.Assert._
-import org.junit.Test
 import org.apache.flink.api.common.functions.Partitioner
-import org.apache.flink.api.scala._
-import org.apache.flink.test.compiler.util.CompilerTestBase
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType
-import org.apache.flink.compiler.plan.SingleInputPlanNode
-import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.common.InvalidProgramException
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
-import org.apache.flink.compiler.plan.DualInputPlanNode
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
index 24dbfe5..d150e85 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassComparatorTest.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/8d414d7e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 58bf3db..6739948 100644
--- a/pom.xml
+++ b/pom.xml
@@ -282,6 +282,12 @@ under the License.
 			</dependency>
 
 			<dependency>
+				<groupId>com.github.romix.akka</groupId>
+				<artifactId>akka-kryo-serialization_2.10</artifactId>
+				<version>0.3.2</version>
+			</dependency>
+
+			<dependency>
 				<groupId>org.scalatest</groupId>
 				<artifactId>scalatest_2.10</artifactId>
 				<version>2.2.2</version>


Mime
View raw message