flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [4/6] flink git commit: [jobmanager] Add a process reaper to kill the JobManager process when the main actor dies.
Date Tue, 24 Feb 2015 12:31:11 GMT
[jobmanager] Add a process reaper to kill the JobManager process when the main actor dies.

Also adds various tests for failure behavior during job submission.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5725c721
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5725c721
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5725c721

Branch: refs/heads/master
Commit: 5725c72129fe9a75487204b470a30e850ad4091c
Parents: 9ddb565
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 23 23:35:10 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Feb 24 10:14:30 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/process/ProcessReaper.java    |  60 ++++++
 .../flink/runtime/jobmanager/JobManager.scala   |  65 ++++--
 .../JobManagerProcessReapingTest.java           | 150 +++++++++++++
 .../jobmanager/JobManagerStartupTest.java       |  23 +-
 .../runtime/jobmanager/JobManagerTest.java      |  48 +++++
 .../flink/runtime/jobmanager/JobSubmitTest.java |  12 +-
 .../runtime/testutils/CommonTestUtils.java      | 127 ++++++-----
 .../testutils/InterruptibleByteChannel.java     | 210 -------------------
 .../runtime/testutils/ServerTestUtils.java      | 181 ----------------
 .../JobSubmissionFailsITCase.java               |  11 +-
 .../apache/flink/yarn/ApplicationMaster.scala   |   2 +-
 11 files changed, 408 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
new file mode 100644
index 0000000..b12b82d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.process;
+
+import akka.actor.ActorRef;
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+import org.slf4j.Logger;
+
+/**
+ * Utility actors that monitors other actors and kills the JVM upon
+ * actor termination.
+ */
+public class ProcessReaper extends UntypedActor {
+
+	private final Logger log;
+	private final int exitCode;
+
+	public ProcessReaper(ActorRef watchTarget, Logger log, int exitCode) {
+		if (watchTarget == null || watchTarget.equals(ActorRef.noSender())) {
+			throw new IllegalArgumentException("Target may not be null or 'noSender'");
+		}
+		this.log = log;
+		this.exitCode = exitCode;
+
+		getContext().watch(watchTarget);
+	}
+
+	@Override
+	public void onReceive(Object message) {
+		if (message instanceof Terminated) {
+			try {
+				Terminated term = (Terminated) message;
+				String name = term.actor().path().name();
+				if (log != null) {
+					log.error("Actor " + name + " terminated, stopping process...");
+				}
+			}
+			finally {
+				System.exit(exitCode);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index ce3bc74..a1642b4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
+import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -424,7 +425,8 @@ class JobManager(val configuration: Configuration,
 
         val userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID)
         if (userCodeLoader == null) {
-          throw new JobSubmissionException(jobId, "The user code class loader could not be
initialized.")
+          throw new JobSubmissionException(jobId,
+            "The user code class loader could not be initialized.")
         }
 
         if (jobGraph.getNumberOfVertices == 0) {
@@ -522,7 +524,14 @@ class JobManager(val configuration: Configuration,
         executionGraph.scheduleForExecution(scheduler)
       }
       catch {
-        case t: Throwable => executionGraph.fail(t);
+        case t: Throwable => try {
+          executionGraph.fail(t)
+        }
+        catch {
+          case tt: Throwable => {
+            log.error(tt, "Error while marking ExecutionGraph as failed.")
+          }
+        }
       }
     }
   }
@@ -574,7 +583,8 @@ object JobManager {
 
   val LOG = LoggerFactory.getLogger(classOf[JobManager])
 
-  val FAILURE_RETURN_CODE = 1
+  val STARTUP_FAILURE_RETURN_CODE = 1
+  val RUNTIME_FAILURE_RETURN_CODE = 2
 
   val JOB_MANAGER_NAME = "jobmanager"
   val EVENT_COLLECTOR_NAME = "eventcollector"
@@ -595,34 +605,44 @@ object JobManager {
 
     val (configuration: Configuration,
          executionMode: ExecutionMode,
-         listeningAddress:  Option[(String, Int)]) =
+         listeningHost: String, listeningPort: Int) =
     try {
       parseArgs(args)
     }
     catch {
       case t: Throwable => {
         LOG.error(t.getMessage(), t)
-        System.exit(FAILURE_RETURN_CODE)
+        System.exit(STARTUP_FAILURE_RETURN_CODE)
         null
       }
     }
 
+    // we may want to check that the JobManager hostname is in the config
+    // if it is not in there, the actor system will bind to the loopback interface's
+    // address and will not be reachable from anyone remote
+    if (listeningHost == null) {
+      val message = "Config parameter '" + ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY +
+        "' is missing (hostname or address to bind JobManager to)."
+      LOG.error(message)
+      System.exit(STARTUP_FAILURE_RETURN_CODE)
+    }
+
     try {
       if (SecurityUtils.isSecurityEnabled) {
         LOG.info("Security is enabled. Starting secure JobManager.")
         SecurityUtils.runSecured(new FlinkSecuredRunner[Unit] {
           override def run(): Unit = {
-            runJobManager(configuration, executionMode, listeningAddress)
+            runJobManager(configuration, executionMode, listeningHost, listeningPort)
           }
         })
       } else {
-        runJobManager(configuration, executionMode, listeningAddress)
+        runJobManager(configuration, executionMode, listeningHost, listeningPort)
       }
     }
     catch {
       case t: Throwable => {
         LOG.error("Failed to start JobManager.", t)
-        System.exit(FAILURE_RETURN_CODE)
+        System.exit(STARTUP_FAILURE_RETURN_CODE)
       }
     }
   }
@@ -630,26 +650,23 @@ object JobManager {
 
   def runJobManager(configuration: Configuration,
                     executionMode: ExecutionMode,
-                    listeningAddress: Option[(String, Int)]) : Unit = {
+                    listeningAddress: String,
+                    listeningPort: Int) : Unit = {
 
     LOG.info("Starting JobManager")
     LOG.debug("Starting JobManager actor system")
 
     val jobManagerSystem = try {
-      AkkaUtils.createActorSystem(configuration, listeningAddress)
+      AkkaUtils.createActorSystem(configuration, Some((listeningAddress, listeningPort)))
     }
     catch {
       case t: Throwable => {
         if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
           val cause = t.getCause()
           if (cause != null && t.getCause().isInstanceOf[java.net.BindException])
{
-            val address = listeningAddress match {
-              case Some((host, port)) => host + ":" + port
-              case None => "unknown"
-            }
-
-            throw new Exception("Unable to create JobManager at address " + address + ":
"
-              + cause.getMessage(), t)
+            val address = listeningAddress + ":" + listeningPort
+            throw new Exception("Unable to create JobManager at address " + address +
+              " - " + cause.getMessage(), t)
           }
         }
         throw new Exception("Could not create JobManager actor system", t)
@@ -662,6 +679,11 @@ object JobManager {
       // bring up the job manager actor
       val (jobManager, archiver) = startJobManagerActors(configuration, jobManagerSystem)
 
+      // start a process reaper that watches the JobManager
+      jobManagerSystem.actorOf(
+        Props(classOf[ProcessReaper], jobManager, LOG, RUNTIME_FAILURE_RETURN_CODE),
+        "JobManager_Process_Reaper")
+
       // bring up a local task manager, if needed
       if(executionMode.equals(LOCAL)){
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL mode execution")
@@ -696,9 +718,9 @@ object JobManager {
    * line arguments.
    *
    * @param args command line arguments
-   * @return triple of configuration, execution mode and an optional listening address
+   * @return Quadruple of configuration, execution mode and an optional listening address
    */
-  def parseArgs(args: Array[String]): (Configuration, ExecutionMode, Option[(String, Int)])
= {
+  def parseArgs(args: Array[String]): (Configuration, ExecutionMode, String, Int) = {
     val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("jobmanager") {
       head("flink jobmanager")
       opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text ("Specify
" +
@@ -728,10 +750,7 @@ object JobManager {
         val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
           ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT)
 
-        // Listening address on which the actor system listens for remote messages
-        val listeningAddress = Some((hostname, port))
-
-        (configuration, config.executionMode, listeningAddress)
+        (configuration, config.executionMode, hostname, port)
     } getOrElse {
       throw new Exception("Wrong arguments. Usage: " + parser.usage)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
new file mode 100644
index 0000000..74ed938
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import static org.junit.Assert.*;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.junit.Test;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import scala.Some;
+import scala.Tuple2;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests that the JobManager process properly exits when the JobManager actor dies.
+ */
+public class JobManagerProcessReapingTest {
+
+	private static final int JOB_MANAGER_PORT = 56532;
+
+	@Test
+	public void testReapProcessOnFailure() {
+		Process jmProcess = null;
+		ActorSystem localSystem = null;
+
+		try {
+			String javaCommand = getJavaCommandPath();
+
+			// check that we run this test only if the java command
+			// is available on this machine
+			if (javaCommand == null) {
+				return;
+			}
+
+			// start a JobManger process
+			String[] command = new String[] {
+					javaCommand,
+					"-Dlog.level=OFF",
+					"-Xms256m", "-Xmx256m",
+					"-classpath", getCurrentClasspath(),
+					JobManagerTestEntryPoint.class.getName()
+			};
+
+			ProcessBuilder bld = new ProcessBuilder(command);
+			jmProcess = bld.start();
+
+			// start another actor system so we can send something to the JobManager
+			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost",
0);
+			localSystem = AkkaUtils.createActorSystem(
+					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
+
+			// grab the reference to the JobManager. try multiple times, until the process
+			// is started and the JobManager is up
+			ActorRef jobManagerRef = null;
+			for (int i = 0; i < 20; i++) {
+				try {
+					jobManagerRef = JobManager.getJobManagerRemoteReference(
+							new InetSocketAddress("localhost", JOB_MANAGER_PORT),
+							localSystem, new FiniteDuration(20, TimeUnit.SECONDS));
+					break;
+				}
+				catch (Throwable t) {
+					// job manager probably not ready yet
+				}
+				Thread.sleep(500);
+			}
+
+			assertTrue("JobManager process died", isProcessAlive(jmProcess));
+			assertTrue("JobManager process did not launch the JobManager properly", jobManagerRef
!= null);
+
+			// kill the JobManager actor
+			jobManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+
+			// wait for max 5 seconds for the process to terminate
+			{
+				long now = System.currentTimeMillis();
+				long deadline = now + 5000;
+
+				while (now < deadline && isProcessAlive(jmProcess)) {
+					Thread.sleep(100);
+					now = System.currentTimeMillis();
+				}
+			}
+
+			assertFalse("JobManager process did not terminate upon actor death", isProcessAlive(jmProcess));
+
+			int returnCode = jmProcess.exitValue();
+			assertEquals("JobManager died, but not because of the process reaper",
+					JobManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (jmProcess != null) {
+				jmProcess.destroy();
+			}
+			if (localSystem != null) {
+				localSystem.shutdown();
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public static class JobManagerTestEntryPoint {
+
+		public static void main(String[] args) {
+			try {
+				Configuration cfg = new Configuration();
+				cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
+				cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, JOB_MANAGER_PORT);
+
+				JobManager.runJobManager(cfg, ExecutionMode.CLUSTER(), "localhost", JOB_MANAGER_PORT);
+				System.exit(0);
+			}
+			catch (Throwable t) {
+				System.exit(1);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
index c0928b0..63c8d14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerStartupTest.java
@@ -28,15 +28,20 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.net.NetUtils;
 import org.junit.Test;
 
-import scala.Some;
-import scala.Tuple2;
-
+/**
+ * Tests that verify the startup behavior of the JobManager in failure
+ * situations, when the JobManager cannot be started.
+ */
 public class JobManagerStartupTest {
 
+	/**
+	 * Verifies that the JobManager fails fast (and with expressive error message)
+	 * when the port to listen is already in use.
+	 */
 	@Test
 	public void testStartupWithPortInUse() {
 		
-		ServerSocket portOccupier = null;
+		ServerSocket portOccupier;
 		final int portNum;
 		
 		try {
@@ -49,8 +54,7 @@ public class JobManagerStartupTest {
 		}
 		
 		try {
-			Tuple2<String, Object> connection = new Tuple2<String, Object>("localhost",
portNum);
-			JobManager.runJobManager(new Configuration(), ExecutionMode.CLUSTER(), new Some<Tuple2<String,
Object>>(connection));
+			JobManager.runJobManager(new Configuration(), ExecutionMode.CLUSTER(), "localhost", portNum);
 			fail("this should throw an exception");
 		}
 		catch (Exception e) {
@@ -68,6 +72,10 @@ public class JobManagerStartupTest {
 		}
 	}
 
+	/**
+	 * Verifies that the JobManager fails fast (and with expressive error message)
+	 * when one of its components (here the BLOB server) fails to start properly.
+	 */
 	@Test
 	public void testJobManagerStartupFails() {
 		final int portNum;
@@ -78,12 +86,11 @@ public class JobManagerStartupTest {
 			// skip test if we cannot find a free port
 			return;
 		}
-		Tuple2<String, Object> connection = new Tuple2<String, Object>("localhost",
portNum);
 		Configuration failConfig = new Configuration();
 		failConfig.setString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, "/does-not-exist-no-sir");
 
 		try {
-			JobManager.runJobManager(failConfig, ExecutionMode.CLUSTER(), new Some<Tuple2<String,
Object>>(connection));
+			JobManager.runJobManager(failConfig, ExecutionMode.CLUSTER(), "localhost", portNum);
 			fail("this should fail with an exception");
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
new file mode 100644
index 0000000..702d656
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmanager;
+
+import com.typesafe.config.Config;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.junit.Test;
+import scala.Some;
+import scala.Tuple2;
+
+import java.net.InetAddress;
+
+import static org.junit.Assert.*;
+
+public class JobManagerTest {
+
+	@Test
+	public void testNullHostnameGoesToLocalhost() {
+		try {
+			Tuple2<String, Object> address = new Tuple2<String, Object>(null, 1772);
+			Config cfg = AkkaUtils.getAkkaConfig(new Configuration(),
+					new Some<Tuple2<String, Object>>(address));
+
+			String hostname = cfg.getString("akka.remote.netty.tcp.hostname");
+			assertTrue(InetAddress.getByName(hostname).isLoopbackAddress());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 5967bdd..e69687f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -45,9 +44,13 @@ import java.util.concurrent.TimeUnit;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests that the JobManager handles Jobs correctly that fail in
+ * the initialization during the submit phase.
+ */
 public class JobSubmitTest {
 
-	private static final long TIMEOUT = 50000;
+	private static final long TIMEOUT = 5000;
 
 	private static ActorSystem jobManagerSystem;
 	private static ActorRef jobManager;
@@ -117,6 +120,10 @@ public class JobSubmitTest {
 		}
 	}
 
+	/**
+	 * Verifies a correct error message when vertices with master initialization
+	 * (input formats / output formats) fail.
+	 */
 	@Test
 	public void testFailureWhenInitializeOnMasterFails() {
 		try {
@@ -130,7 +137,6 @@ public class JobSubmitTest {
 				}
 			};
 
-			TaskConfig config = new TaskConfig(jobVertex.getConfiguration());
 			jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
 			JobGraph jg = new JobGraph("test job", jobVertex);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 6497822..fc52417 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.testutils;
 
 import static org.junit.Assert.fail;
@@ -29,9 +28,9 @@ import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
 
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
@@ -42,52 +41,6 @@ import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 public class CommonTestUtils {
 
 	/**
-	 * Constructs a random filename. The filename is a string of 16 hex characters followed
by a <code>.dat</code>
-	 * prefix.
-	 * 
-	 * @return the random filename
-	 */
-	public static String getRandomFilename() {
-
-		final char[] alphabeth = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b',
'c', 'd', 'e', 'f' };
-
-		String filename = "";
-		for (int i = 0; i < 16; i++) {
-			filename += alphabeth[(int) (Math.random() * alphabeth.length)];
-		}
-
-		return filename + ".dat";
-	}
-
-	/**
-	 * Constructs a random directory name. The directory is a string of 16 hex characters
-	 * prefix.
-	 * 
-	 * @return the random directory name
-	 */
-	public static String getRandomDirectoryName() {
-
-		final char[] alphabeth = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b',
'c', 'd', 'e', 'f' };
-
-		String filename = "";
-		for (int i = 0; i < 16; i++) {
-			filename += alphabeth[(int) (Math.random() * alphabeth.length)];
-		}
-
-		return filename;
-	}
-
-	/**
-	 * Reads the path to the directory for temporary files from the configuration and returns
it.
-	 * 
-	 * @return the path to the directory for temporary files
-	 */
-	public static String getTempDir() {
-		return GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
-			ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(File.pathSeparator)[0];
-	}
-
-	/**
 	 * Creates a copy of the given {@link IOReadableWritable} object by an in-memory serialization
and subsequent
 	 * deserialization.
 	 * 
@@ -162,8 +115,13 @@ public class CommonTestUtils {
 		
 		return copy;
 	}
-	
-	
+
+	/**
+	 * Sleeps for a given set of milliseconds, uninterruptibly. If interrupt is called,
+	 * the sleep will continue nonetheless.
+	 *
+	 * @param msecs The number of milliseconds to sleep.
+	 */
 	public static void sleepUninterruptibly(long msecs) {
 		
 		long now = System.currentTimeMillis();
@@ -179,4 +137,71 @@ public class CommonTestUtils {
 			now = System.currentTimeMillis();
 		}
 	}
+
+	/**
+	 * Gets the classpath with which the current JVM was started.
+	 *
+	 * @return The classpath with which the current JVM was started.
+	 */
+	public static String getCurrentClasspath() {
+		RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
+		return bean.getClassPath();
+	}
+
+	/**
+	 * Tries to get the java executable command with which the current JVM was started.
+	 * Returns null, if the command could not be found.
+	 *
+	 * @return The java executable command.
+	 */
+	public static String getJavaCommandPath() {
+		File javaHome = new File(System.getProperty("java.home"));
+
+		String path1 = new File(javaHome, "java").getAbsolutePath();
+		String path2 = new File(new File(javaHome, "bin"), "java").getAbsolutePath();
+
+		try {
+			ProcessBuilder bld = new ProcessBuilder(path1, "-version");
+			Process process = bld.start();
+			if (process.waitFor() == 0) {
+				return path1;
+			}
+		}
+		catch (Throwable t) {
+			// ignore and try the second path
+		}
+
+		try {
+			ProcessBuilder bld = new ProcessBuilder(path2, "-version");
+			Process process = bld.start();
+			if (process.waitFor() == 0) {
+				return path2;
+			}
+		}
+		catch (Throwable tt) {
+			// no luck
+		}
+		return null;
+	}
+
+	/**
+	 * Checks whether a process is still alive. Utility method for JVM versions before 1.8,
+	 * where no direct method to check that is available.
+	 *
+	 * @param process The process to check.
+	 * @return True, if the process is alive, false otherwise.
+	 */
+	public static boolean isProcessAlive(Process process) {
+		if (process == null) {
+			return false;
+
+		}
+		try {
+			process.exitValue();
+			return false;
+		}
+		catch(IllegalThreadStateException e) {
+			return true;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InterruptibleByteChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InterruptibleByteChannel.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InterruptibleByteChannel.java
deleted file mode 100644
index dd0a5d8..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/InterruptibleByteChannel.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-/**
- * This class is a special test implementation of a {@link ReadableByteChannel} and {@link
WritableByteChannel}. Data is
- * first written into main memory through the {@link WritableByteChannel} interface. Afterwards,
the data can be read
- * again through the {@link ReadableByteChannel} abstraction. The implementation is capable
of simulating interruptions
- * in the byte stream.
- * <p>
- * This class is not thread-safe.
- * 
- */
-public class InterruptibleByteChannel implements ReadableByteChannel, WritableByteChannel
{
-
-	/**
-	 * The initial size of the internal memory buffer in bytes.
-	 */
-	private static final int INITIAL_BUFFER_SIZE = 8192;
-
-	/**
-	 * Stores the requested interruptions of the byte stream during write operations that still
have to be processed.
-	 */
-	private final Queue<Integer> writeInterruptPositions = new ArrayDeque<Integer>();
-
-	/**
-	 * Stores the requested interruptions of the byte stream during read operations that still
have to be processed.
-	 */
-	private final Queue<Integer> readInterruptPositions = new ArrayDeque<Integer>();
-
-	/**
-	 * The internal memory buffer used to hold the written data.
-	 */
-	private ByteBuffer buffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
-
-	/**
-	 * Stores if the channel is still open.
-	 */
-	private boolean isOpen = true;
-
-	/**
-	 * Stores if the channel is still in write phase.
-	 */
-	private boolean isInWritePhase = true;
-
-	/**
-	 * Constructs a new interruptible byte channel.
-	 * 
-	 * @param writeInterruptPositions
-	 *        the positions to interrupt the byte stream during write operations or <code>null</code>
if no
-	 *        interruptions are requested
-	 * @param readInterruptPositions
-	 *        the positions to interrupt the byte stream during read operations or <code>null</code>
if no interruptions
-	 *        are requested
-	 */
-	public InterruptibleByteChannel(final int[] writeInterruptPositions, final int[] readInterruptPositions)
{
-
-		if (writeInterruptPositions != null) {
-
-			for (int i = 0; i < writeInterruptPositions.length - 1; ++i) {
-				if (writeInterruptPositions[i] > writeInterruptPositions[i + 1]) {
-					throw new IllegalArgumentException("Write interrupt positions must be ordered ascendingly");
-				}
-				this.writeInterruptPositions.add(Integer.valueOf(writeInterruptPositions[i]));
-			}
-
-			this.writeInterruptPositions.add(Integer
-				.valueOf(writeInterruptPositions[writeInterruptPositions.length - 1]));
-		}
-
-		if (readInterruptPositions != null) {
-
-			for (int i = 0; i < readInterruptPositions.length - 1; ++i) {
-				if (readInterruptPositions[i] > readInterruptPositions[i + 1]) {
-					throw new IllegalArgumentException("Read interrupt positions must be ordered ascendingly");
-				}
-				this.readInterruptPositions.add(Integer.valueOf(readInterruptPositions[i]));
-			}
-
-			this.readInterruptPositions.add(Integer
-				.valueOf(readInterruptPositions[readInterruptPositions.length - 1]));
-		}
-	}
-
-
-	@Override
-	public boolean isOpen() {
-
-		return this.isOpen;
-	}
-
-
-	@Override
-	public void close() throws IOException {
-
-		this.isOpen = false;
-	}
-
-
-	@Override
-	public int write(final ByteBuffer src) throws IOException {
-
-		if (!this.isOpen) {
-			throw new ClosedChannelException();
-		}
-
-		if (!this.isInWritePhase) {
-			throw new IllegalStateException("Channel is not in write phase anymore");
-		}
-
-		if (src.remaining() > this.buffer.remaining()) {
-			increaseBufferSize();
-		}
-
-		int numberOfBytesToAccept = src.remaining();
-		if (!this.writeInterruptPositions.isEmpty()
-			&& (this.buffer.position() + numberOfBytesToAccept < this.writeInterruptPositions.peek().intValue()))
{
-			numberOfBytesToAccept = this.writeInterruptPositions.poll().intValue() - this.buffer.position();
-
-			this.buffer.limit(this.buffer.position() + numberOfBytesToAccept);
-			this.buffer.put(src);
-			this.buffer.limit(this.buffer.capacity());
-
-			return numberOfBytesToAccept;
-		}
-
-		this.buffer.put(src);
-
-		return numberOfBytesToAccept;
-	}
-
-
-	@Override
-	public int read(final ByteBuffer dst) throws IOException {
-
-		if (!this.isOpen) {
-			throw new ClosedChannelException();
-		}
-
-		if (this.isInWritePhase) {
-			throw new IllegalStateException("Channel is still in write phase");
-		}
-
-		if (!this.buffer.hasRemaining()) {
-			return -1;
-		}
-
-		int numberOfBytesToRetrieve = Math.min(this.buffer.remaining(), dst.remaining());
-		if (!this.readInterruptPositions.isEmpty()
-				&& (this.buffer.position() + numberOfBytesToRetrieve > this.readInterruptPositions.peek().intValue()))
{
-			numberOfBytesToRetrieve = this.readInterruptPositions.poll().intValue() - this.buffer.position();
-		}
-
-		final int oldLimit = this.buffer.limit();
-		this.buffer.limit(this.buffer.position() + numberOfBytesToRetrieve);
-		dst.put(this.buffer);
-		this.buffer.limit(oldLimit);
-
-		return numberOfBytesToRetrieve;
-	}
-
-	/**
-	 * Switches the channel to read phase.
-	 */
-	public void switchToReadPhase() {
-
-		if (!this.isInWritePhase) {
-			throw new IllegalStateException("Channel is already in read phase");
-		}
-
-		this.isInWritePhase = false;
-		this.buffer.flip();
-	}
-
-	/**
-	 * Doubles the capacity of the internal byte buffer while preserving its content.
-	 */
-	private void increaseBufferSize() {
-
-		final ByteBuffer newBuf = ByteBuffer.allocate(this.buffer.capacity() * 2);
-		this.buffer.flip();
-		newBuf.put(this.buffer);
-		this.buffer = newBuf;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
deleted file mode 100644
index afc32ec..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ServerTestUtils.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.testutils;
-
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
-import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-
-/**
- * This class contains a selection of utility functions which are used for testing the nephele-server
module.
- */
-public final class ServerTestUtils {
-
-	/**
-	 * Private constructor.
-	 */
-	private ServerTestUtils() {}
-
-	/**
-	 * Creates a file with a random name in the given sub directory within the directory for
temporary files. The
-	 * directory for temporary files is read from the configuration. The file contains a sequence
of integer numbers
-	 * from 0 to <code>limit</code>. The individual numbers are separated by a newline.
-	 * 
-	 * @param subDirectory
-	 *        name of the sub directory to create the input file in
-	 * @param limit
-	 *        the upper bound for the sequence of integer numbers to generate
-	 * @return a {@link File} object referring to the created file
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while writing the file
-	 */
-	public static File createInputFile(String subDirectory, int limit) throws IOException {
-
-		if (limit < 0) {
-			throw new IllegalArgumentException("limit must be >= 0");
-		}
-
-		final File inputFile = new File(getTempDir() + File.separator + subDirectory + File.separator
-			+ getRandomFilename());
-
-		if (inputFile.exists()) {
-			inputFile.delete();
-		}
-
-		inputFile.createNewFile();
-		FileWriter fw = new FileWriter(inputFile);
-		for (int i = 0; i < limit; i++) {
-
-			fw.write(Integer.toString(i) + "\n");
-		}
-		fw.close();
-
-		return inputFile;
-	}
-
-	/**
-	 * Creates a file with a random name in the directory for temporary files. The directory
for temporary files is read
-	 * from the configuration. The file contains a sequence of integer numbers from 0 to <code>limit</code>.
The
-	 * individual numbers are separated by a newline.
-	 * 
-	 * @param limit
-	 *        the upper bound for the sequence of integer numbers to generate
-	 * @return a {@link File} object referring to the created file
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while writing the file
-	 */
-	public static File createInputFile(int limit) throws IOException {
-		return createInputFile("", limit);
-	}
-
-	/**
-	 * Constructs a random filename. The filename is a string of 16 hex characters followed
by a <code>.dat</code>
-	 * prefix.
-	 * 
-	 * @return the random filename
-	 */
-	public static String getRandomFilename() {
-
-		final char[] alphabeth = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b',
'c', 'd', 'e', 'f' };
-
-		String filename = "";
-		for (int i = 0; i < 16; i++) {
-			filename += alphabeth[(int) (Math.random() * alphabeth.length)];
-		}
-
-		return filename + ".dat";
-	}
-
-	/**
-	 * Returns the path to the directory for temporary files.
-	 * 
-	 * @return the path to the directory for temporary files
-	 */
-	public static String getTempDir() {
-
-		return System.getProperty("java.io.tmpdir");
-	}
-
-	/**
-	 * Creates a copy of the given {@link IOReadableWritable} object by an in-memory serialization
and subsequent
-	 * deserialization.
-	 * 
-	 * @param original
-	 *        the original object to be copied
-	 * @return the copy of original object created by the original object's serialization/deserialization
methods
-	 * @throws IOException
-	 *         thrown if an error occurs while creating the copy of the object
-	 */
-	@SuppressWarnings("unchecked")
-	public static <T extends IOReadableWritable> T createCopy(final T original) throws
IOException {
-
-		final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		final DataOutputStream dos = new DataOutputStream(baos);
-
-		original.write(new OutputViewDataOutputStreamWrapper(dos));
-
-		final String className = original.getClass().getName();
-		if (className == null) {
-			fail("Class name is null");
-		}
-
-		Class<T> clazz = null;
-
-		try {
-			clazz = (Class<T>) Class.forName(className);
-		} catch (ClassNotFoundException e) {
-			fail(e.getMessage());
-		}
-
-		if (clazz == null) {
-			fail("Cannot find class with name " + className);
-		}
-
-		T copy = null;
-		try {
-			copy = clazz.newInstance();
-		} catch (InstantiationException e) {
-			fail(e.getMessage());
-		} catch (IllegalAccessException e) {
-			fail(e.getMessage());
-		}
-
-		if (copy == null) {
-			fail("Copy of object of type " + className + " is null");
-		}
-
-		final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		final DataInputStream dis = new DataInputStream(bais);
-
-		copy.read(new InputViewDataInputStreamWrapper(dis));
-
-		return copy;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 7da4125..5690475 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -41,6 +41,7 @@ import java.util.Arrays;
 import java.util.Collection;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(Parameterized.class)
@@ -153,11 +154,13 @@ public class JobSubmissionFailsITCase {
 						try {
 							submitJob(jobGraph, jobClient);
 							fail("Expected JobSubmissionException.");
-						} catch (JobSubmissionException e) {
-							assertEquals("Job is empty.", e.getMessage());
-						} catch (Throwable t) {
-							fail("Caught wrong exception of type " + t.getClass() + ".");
+						}
+						catch (JobSubmissionException e) {
+							assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
+						}
+						catch (Throwable t) {
 							t.printStackTrace();
+							fail("Caught wrong exception of type " + t.getClass() + ".");
 						}
 
 						try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5725c721/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 6232332..a0cc29f 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -195,7 +195,7 @@ object ApplicationMaster {
     val args = Array[String]("--configDir", currDir)
 
     LOG.info(s"Config path: $currDir.")
-    val (configuration, _, _) = JobManager.parseArgs(args)
+    val (configuration, _, _, _) = JobManager.parseArgs(args)
 
     // add dynamic properties to JobManager configuration.
     val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)


Mime
View raw message