flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [FLINK-1631] [client] Overhaul of the client.
Date Wed, 04 Mar 2015 17:39:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0333109bb -> 9c6413740


http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
new file mode 100644
index 0000000..7b0dd2b
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Mockito.*;
+
+import java.net.InetSocketAddress;
+
+import org.apache.flink.client.cli.CommandLineOptions;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests that verify that the CLI client picks up the correct address for the JobManager
+ * from configuration and configs.
+ */
+public class CliFrontendAddressConfigurationTest {
+	
+	@BeforeClass
+	public static void init() {
+		CliFrontendTestUtils.pipeSystemOutToNull();
+	}
+	
+	@Before
+	public void clearConfig() {
+		CliFrontendTestUtils.clearGlobalConfiguration();
+	}
+
+	@Test
+	public void testInvalidConfigAndNoOption() {
+		try {
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
+			CommandLineOptions options = mock(CommandLineOptions.class);
+
+			try {
+				frontend.getJobManagerAddress(options);
+				fail("we expect an exception here because the we have no config");
+			}
+			catch (Exception e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testInvalidConfigAndOption() {
+		try {
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
+
+			CommandLineOptions options = mock(CommandLineOptions.class);
+			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
+
+			assertNotNull(frontend.getJobManagerAddress(options));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testValidConfig() {
+		try {
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+
+			CommandLineOptions options = mock(CommandLineOptions.class);
+			InetSocketAddress address = frontend.getJobManagerAddress(options);
+			
+			assertNotNull(address);
+			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress());
+			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_PORT, address.getPort());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testYarnConfig() {
+		try {
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
+
+			CommandLineOptions options = mock(CommandLineOptions.class);
+			InetSocketAddress address = frontend.getJobManagerAddress(options);
+			
+			assertNotNull(address);
+			assertEquals(CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress());
+			assertEquals(CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_PORT, address.getPort());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testInvalidYarnConfig() {
+		try {
+			CliFrontend cli = new CliFrontend(CliFrontendTestUtils.getConfigDirWithInvalidYarnFile());
+
+			CommandLineOptions options = mock(CommandLineOptions.class);
+
+			InetSocketAddress address = cli.getJobManagerAddress(options);
+
+			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress());
+			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_PORT, address.getPort());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testManualOptionsOverridesConfig() {
+		try {
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+
+			CommandLineOptions options = mock(CommandLineOptions.class);
+			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
+
+			InetSocketAddress address = frontend.getJobManagerAddress(options);
+			
+			assertNotNull(address);
+			assertEquals("10.221.130.22", address.getAddress().getHostAddress());
+			assertEquals(7788, address.getPort());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testManualOptionsOverridesYarn() {
+		try {
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
+
+			CommandLineOptions options = mock(CommandLineOptions.class);
+			when(options.getJobManagerAddress()).thenReturn("10.221.130.22:7788");
+
+			InetSocketAddress address = frontend.getJobManagerAddress(options);
+			
+			assertNotNull(address);
+			assertEquals("10.221.130.22", address.getAddress().getHostAddress());
+			assertEquals(7788, address.getPort());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index c3b7021..ed885cf 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client;
 
-import org.apache.commons.cli.CommandLine;
-import org.apache.flink.client.CliFrontendTestUtils.TestingCliFrontend;
+import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -29,7 +27,6 @@ import org.apache.flink.configuration.Configuration;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 
@@ -49,21 +46,20 @@ public class CliFrontendInfoTest {
 			// test unrecognized option
 			{
 				String[] parameters = {"-v", "-l"};
-				CliFrontend testFrontend = new CliFrontend();
+				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 				int retCode = testFrontend.cancel(parameters);
-				assertTrue(retCode == 1);
+				assertTrue(retCode != 0);
 			}
 			
 			// test missing options
 			{
 				String[] parameters = {};
-				CliFrontend testFrontend = new CliFrontend();
+				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 				int retCode = testFrontend.cancel(parameters);
 				assertTrue(retCode != 0);
 			}
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail("Program caused an exception: " + e.getMessage());
 		}
@@ -72,13 +68,12 @@ public class CliFrontendInfoTest {
 	@Test
 	public void testShowExecutionPlan() {
 		try {
-			String[] parameters = {CliFrontendTestUtils.getTestJarPath()};
+			String[] parameters = new String[] { CliFrontendTestUtils.getTestJarPath() };
 			InfoTestCliFrontend testFrontend = new InfoTestCliFrontend(-1);
 			int retCode = testFrontend.info(parameters);
 			assertTrue(retCode == 0);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail("Program caused an exception: " + e.getMessage());
 		}
@@ -93,7 +88,6 @@ public class CliFrontendInfoTest {
 			assertTrue(retCode == 0);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail("Program caused an exception: " + e.getMessage());
 		}
@@ -101,22 +95,20 @@ public class CliFrontendInfoTest {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	private static final class InfoTestCliFrontend extends TestingCliFrontend {
+	private static final class InfoTestCliFrontend extends CliFrontend {
 		
 		private final int expectedDop;
 		
-		public InfoTestCliFrontend(int expectedDop) {
+		public InfoTestCliFrontend(int expectedDop) throws Exception {
+			super(CliFrontendTestUtils.getConfigDir());
 			this.expectedDop = expectedDop;
 		}
 
 		@Override
-		protected Client getClient(CommandLine line, ClassLoader loader, String programName) throws IOException {
-			try {
-				return new TestClient(expectedDop);
-			}
-			catch (Exception e) {
-				throw new IOException(e);
-			}
+		protected Client getClient(CommandLineOptions options, ClassLoader loader, String programName)
+				throws Exception
+		{
+			return new TestClient(expectedDop);
 		}
 	}
 	
@@ -125,13 +117,16 @@ public class CliFrontendInfoTest {
 		private final int expectedDop;
 		
 		private TestClient(int expectedDop) throws Exception {
-			super(new InetSocketAddress(InetAddress.getLocalHost(), 6176), new Configuration(), CliFrontendInfoTest.class.getClassLoader());
+			super(new InetSocketAddress(InetAddress.getLocalHost(), 6176),
+					new Configuration(), CliFrontendInfoTest.class.getClassLoader());
 			
 			this.expectedDop = expectedDop;
 		}
 		
 		@Override
-		public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException  {
+		public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism)
+				throws CompilerException, ProgramInvocationException
+		{
 			assertEquals(this.expectedDop, parallelism);
 			return "";
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
deleted file mode 100644
index ef7dff6..0000000
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendJobManagerConnectionTest.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.client;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-
-import java.net.InetSocketAddress;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.apache.flink.client.CliFrontendTestUtils.TestingCliFrontend;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class CliFrontendJobManagerConnectionTest {
-	
-	@BeforeClass
-	public static void init() {
-		CliFrontendTestUtils.pipeSystemOutToNull();
-	}
-	
-	@Before
-	public void clearConfig() {
-		CliFrontendTestUtils.clearGlobalConfiguration();
-	}
-
-	@Test
-	public void testInvalidConfig() {
-		try {
-			String[] arguments = {};
-			CommandLine line = new PosixParser().parse(CliFrontend.getJobManagerAddressOption(new Options()), arguments, false);
-				
-			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
-			
-			assertTrue(frontend.getJobManagerAddressString(line) == null);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testValidConfig() {
-		try {
-			String[] arguments = {};
-			CommandLine line = new PosixParser().parse(CliFrontend.getJobManagerAddressOption(new Options()), arguments, false);
-				
-			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDir());
-			
-			InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
-			
-			assertNotNull(address);
-			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress());
-			assertEquals(CliFrontendTestUtils.TEST_JOB_MANAGER_PORT, address.getPort());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testYarnConfig() {
-		try {
-			String[] arguments = {};
-			CommandLine line = new PosixParser().parse(CliFrontend.getJobManagerAddressOption(new Options()), arguments, false);
-				
-			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
-			
-			InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
-			
-			assertNotNull(address);
-			assertEquals(CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_ADDRESS, address.getAddress().getHostAddress());
-			assertEquals(CliFrontendTestUtils.TEST_YARN_JOB_MANAGER_PORT, address.getPort());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testInvalidYarnConfig() {
-		try {
-			String[] arguments = {};
-			CommandLine line = new PosixParser().parse(CliFrontend.getJobManagerAddressOption(new Options()), arguments, false);
-				
-			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithInvalidYarnFile());
-			
-			assertTrue(frontend.getJobManagerAddressString(line) == null);
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testManualOptionsOverridesConfig() {
-		try {
-			String[] arguments = {"-m", "10.221.130.22:7788"};
-			CommandLine line = new PosixParser().parse(CliFrontend.getJobManagerAddressOption(new Options()), arguments, false);
-				
-			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDir());
-			
-			InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
-			
-			assertNotNull(address);
-			assertEquals("10.221.130.22", address.getAddress().getHostAddress());
-			assertEquals(7788, address.getPort());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testManualOptionsOverridesYarn() {
-		try {
-			String[] arguments = {"-m", "10.221.130.22:7788"};
-			CommandLine line = new PosixParser().parse(CliFrontend.getJobManagerAddressOption(new Options()), arguments, false);
-				
-			TestingCliFrontend frontend = new TestingCliFrontend(CliFrontendTestUtils.getConfigDirWithYarnFile());
-			
-			InetSocketAddress address = RemoteExecutor.getInetFromHostport(frontend.getJobManagerAddressString(line));
-			
-			assertNotNull(address);
-			assertEquals("10.221.130.22", address.getAddress().getHostAddress());
-			assertEquals(7788, address.getPort());
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index fafe929..2712c19 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -20,14 +20,15 @@ package org.apache.flink.client;
 
 import akka.actor.*;
 import akka.testkit.JavaTestKit;
-import org.apache.commons.cli.CommandLine;
-import org.apache.flink.configuration.Configuration;
+
+import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
 import static org.junit.Assert.*;
 
 public class CliFrontendListCancelTest {
@@ -36,11 +37,12 @@ public class CliFrontendListCancelTest {
 
 	@BeforeClass
 	public static void setup(){
+		pipeSystemOutToNull();
 		actorSystem = ActorSystem.create("TestingActorSystem");
 	}
 
 	@AfterClass
-	public static void teardown(){
+	public static void teardown() {
 		JavaTestKit.shutdownActorSystem(actorSystem);
 		actorSystem = null;
 	}
@@ -57,15 +59,15 @@ public class CliFrontendListCancelTest {
 			// test unrecognized option
 			{
 				String[] parameters = {"-v", "-l"};
-				CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend();
+				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 				int retCode = testFrontend.cancel(parameters);
-				assertTrue(retCode == 1);
+				assertTrue(retCode != 0);
 			}
 			
 			// test missing job id
 			{
 				String[] parameters = {};
-				CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend();
+				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 				int retCode = testFrontend.cancel(parameters);
 				assertTrue(retCode != 0);
 			}
@@ -77,29 +79,41 @@ public class CliFrontendListCancelTest {
 
 				final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid));
 				
-				String[] parameters = {jidString};
+				String[] parameters = { jidString };
 				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(jm);
+
 				int retCode = testFrontend.cancel(parameters);
 				assertTrue(retCode == 0);
 			}
+
+			// test cancel properly
+			{
+				JobID jid1 = new JobID();
+				JobID jid2 = new JobID();
+
+				final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid1));
+
+				String[] parameters = { jid2.toString() };
+				InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(jm);
+
+				assertTrue(testFrontend.cancel(parameters) != 0);
+			}
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
 			fail("Program caused an exception: " + e.getMessage());
 		}
 	}
-	
-	
+
 	@Test
 	public void testList() {
 		try {
 			// test unrecognized option
 			{
 				String[] parameters = {"-v", "-k"};
-				CliFrontend testFrontend = new CliFrontendTestUtils.TestingCliFrontend();
+				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 				int retCode = testFrontend.list(parameters);
-				assertTrue(retCode == 1);
+				assertTrue(retCode != 0);
 			}
 			
 			// test list properly
@@ -119,15 +133,19 @@ public class CliFrontendListCancelTest {
 	}
 
 
-	protected static final class InfoListTestCliFrontend extends CliFrontendTestUtils.TestingCliFrontend {
+	protected static final class InfoListTestCliFrontend extends CliFrontend {
+
 		private ActorRef jobmanager;
 
-		public InfoListTestCliFrontend(ActorRef jobmanager){
+
+
+		public InfoListTestCliFrontend(ActorRef jobmanager) throws Exception {
+			super(CliFrontendTestUtils.getConfigDir());
 			this.jobmanager = jobmanager;
 		}
 
 		@Override
-		public ActorRef getJobManager(CommandLine line, Configuration config) {
+		public ActorRef getJobManager(CommandLineOptions options) {
 			return jobmanager;
 		}
 	}
@@ -146,11 +164,16 @@ public class CliFrontendListCancelTest {
 			}
 			else if (message instanceof JobManagerMessages.CancelJob) {
 				JobManagerMessages.CancelJob cancelJob = (JobManagerMessages.CancelJob) message;
-				assertEquals(jobID, cancelJob.jobID());
-				getSender().tell(new Status.Success(new Object()), getSelf());
+
+				if (jobID != null && jobID.equals(cancelJob.jobID())) {
+					getSender().tell(new Status.Success(new Object()), getSelf());
+				}
+				else {
+					getSender().tell(new Status.Failure(new Exception("Wrong or no JobID")), getSelf());
+				}
 			}
-			else if(message instanceof  JobManagerMessages.RequestRunningJobs$) {
-				getSender().tell(new JobManagerMessages.RunningJobs(), getSelf());
+			else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) {
+				getSender().tell(new JobManagerMessages.RunningJobsStatus(), getSelf());
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index 7ecce21..0ffe4ca 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.client;
 
 import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS;
@@ -24,20 +23,22 @@ import static org.apache.flink.client.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
 import static org.apache.flink.client.CliFrontendTestUtils.getNonJarFilePath;
 import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
 import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.flink.client.cli.CliArgsException;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.InfoOptions;
+import org.apache.flink.client.cli.ProgramOptions;
+import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -55,142 +56,150 @@ public class CliFrontendPackageProgramTest {
 	@Test
 	public void testNonExistingJarFile() {
 		try {
-			String[] arguments = {"-j", "/some/none/existing/path", "-a", "--verbose", "true", "arg1", "arg2"};
-			CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);
-				
-			CliFrontend frontend = new CliFrontend();
-			Object result = frontend.buildProgram(line);
-		}
-		catch (FileNotFoundException e) {
-			// that's what we want
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+
+			ProgramOptions options = mock(ProgramOptions.class);
+			when(options.getJarFilePath()).thenReturn("/some/none/existing/path");
+
+			try {
+				frontend.buildProgram(options);
+				fail("should throw an exception");
+			}
+			catch (FileNotFoundException e) {
+				// that's what we want
+			}
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
 	@Test
 	public void testFileNotJarFile() {
 		try {
-			String[] arguments = {"-j", getNonJarFilePath(), "-a", "--verbose", "true", "arg1", "arg2"};
-			CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);
-			
-			CliFrontend frontend = new CliFrontend();
-			Object result = frontend.buildProgram(line);
-		}
-		catch (ProgramInvocationException e) {
-			// that's what we want
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+
+			ProgramOptions options = mock(ProgramOptions.class);
+			when(options.getJarFilePath()).thenReturn(getNonJarFilePath());
+
+			try {
+				frontend.buildProgram(options);
+				fail("should throw an exception");
+			}
+			catch (ProgramInvocationException e) {
+				// that's what we want
+			}
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
 	@Test
 	public void testVariantWithExplicitJarAndArgumentsOption() {
 		try {
-			String[] arguments = {"-j", getTestJarPath(), "-a", "--verbose", "true", "arg1", "arg2"};
-			CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);
-			
-			CliFrontend frontend = new CliFrontend();
-			Object result = frontend.buildProgram(line);
-			assertTrue(result instanceof PackagedProgram);
-			
-			PackagedProgram prog = (PackagedProgram) result;
+			String[] arguments = {"-j", getTestJarPath(), "-a", "--debug", "true", "arg1", "arg2"};
+			String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"};
+
+			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
+			assertEquals(getTestJarPath(), options.getJarFilePath());
+			assertArrayEquals(reducedArguments, options.getProgramArgs());
 
-			Assert.assertArrayEquals(new String[] {"--verbose", "true", "arg1", "arg2"}, prog.getArguments());
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			PackagedProgram prog = frontend.buildProgram(options);
+
+			Assert.assertArrayEquals(reducedArguments, prog.getArguments());
 			Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
 	@Test
 	public void testVariantWithExplicitJarAndNoArgumentsOption() {
 		try {
-			String[] arguments = {"-j", getTestJarPath(), "--verbose", "true", "arg1", "arg2"};
-			CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);
-			
-			CliFrontend frontend = new CliFrontend();
-			Object result = frontend.buildProgram(line);
-			assertTrue(result instanceof PackagedProgram);
-			
-			PackagedProgram prog = (PackagedProgram) result;
+			String[] arguments = {"-j", getTestJarPath(), "--debug", "true", "arg1", "arg2"};
+			String[] reducedArguments = new String[] {"--debug", "true", "arg1", "arg2"};
+
+			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
+			assertEquals(getTestJarPath(), options.getJarFilePath());
+			assertArrayEquals(reducedArguments, options.getProgramArgs());
 
-			Assert.assertArrayEquals(new String[] {"--verbose", "true", "arg1", "arg2"}, prog.getArguments());
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+
+			PackagedProgram prog = frontend.buildProgram(options);
+
+			Assert.assertArrayEquals(reducedArguments, prog.getArguments());
 			Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
 	@Test
 	public void testValidVariantWithNoJarAndNoArgumentsOption() {
 		try {
-			String[] arguments = {getTestJarPath(), "--verbose", "true", "arg1", "arg2"};
-			CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);
-			
-			CliFrontend frontend = new CliFrontend();
-			Object result = frontend.buildProgram(line);
-			assertTrue(result instanceof PackagedProgram);
-			
-			PackagedProgram prog = (PackagedProgram) result;
+			String[] arguments = {getTestJarPath(), "--debug", "true", "arg1", "arg2"};
+			String[] reducedArguments = {"--debug", "true", "arg1", "arg2"};
+
+			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
+			assertEquals(getTestJarPath(), options.getJarFilePath());
+			assertArrayEquals(reducedArguments, options.getProgramArgs());
 
-			Assert.assertArrayEquals(new String[] {"--verbose", "true", "arg1", "arg2"}, prog.getArguments());
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+
+			PackagedProgram prog = frontend.buildProgram(options);
+
+			Assert.assertArrayEquals(reducedArguments, prog.getArguments());
 			Assert.assertEquals(TEST_JAR_MAIN_CLASS, prog.getMainClassName());
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
 	@Test
 	public void testNoJarNoArgumentsAtAll() {
 		try {
-			String[] arguments = {};
-			CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);
-				
-			CliFrontend frontend = new CliFrontend();
-			Object result = frontend.buildProgram(line);
-		}
-		catch (FileNotFoundException e) {
-			// that's what we want
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			assertTrue(frontend.run(new String[0]) != 0);
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
 	@Test
 	public void testNonExistingFileWithArguments() {
 		try {
-			String[] arguments = {"/some/none/existing/path", "--verbose", "true", "arg1", "arg2"};
-			CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);
-				
-			CliFrontend frontend = new CliFrontend();
-			Object result = frontend.buildProgram(line);
-		}
-		catch (FileNotFoundException e) {
-			// that's what we want
+			String[] arguments = {"/some/none/existing/path", "--debug", "true", "arg1", "arg2"};
+			String[] reducedArguments = {"--debug", "true", "arg1", "arg2"};
+
+			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
+			assertEquals(arguments[0], options.getJarFilePath());
+			assertArrayEquals(reducedArguments, options.getProgramArgs());
+
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+
+			try {
+				frontend.buildProgram(options);
+				fail("Should fail with an exception");
+			}
+			catch (FileNotFoundException e) {
+				// that's what we want
+			}
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
@@ -198,18 +207,23 @@ public class CliFrontendPackageProgramTest {
 	public void testNonExistingFileWithoutArguments() {
 		try {
 			String[] arguments = {"/some/none/existing/path"};
-			CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);
-				
-			CliFrontend frontend = new CliFrontend();
-			Object result = frontend.buildProgram(line);
-		}
-		catch (FileNotFoundException e) {
-			// that's what we want
+
+			RunOptions options = CliFrontendParser.parseRunCommand(arguments);
+			assertEquals(arguments[0], options.getJarFilePath());
+			assertArrayEquals(new String[0], options.getProgramArgs());
+
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+
+			try {
+				frontend.buildProgram(options);
+			}
+			catch (FileNotFoundException e) {
+				// that's what we want
+			}
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
@@ -249,15 +263,20 @@ public class CliFrontendPackageProgramTest {
 	@Test
 	public void testPlanWithExternalClass() throws CompilerException, ProgramInvocationException {
 		final boolean[] callme = { false }; // create a final object reference, to be able to change its val later
+
 		try {
-			String[] arguments = {"-j", getTestJarPath(), "-c", TEST_JAR_CLASSLOADERTEST_CLASS , "--verbose", "true", "arg1", "arg2"};
-			CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), arguments, true);
-			
-			CliFrontend frontend = new CliFrontend();
-			Object result = frontend.buildProgram(line);
-			assertTrue(result instanceof PackagedProgram);
+			String[] arguments = {"-c", TEST_JAR_CLASSLOADERTEST_CLASS, getTestJarPath(),
+					"true", "arg1", "arg2" };
+
+			String[] progArgs = { "true", "arg1", "arg2" };
 
-			PackagedProgram prog = spy((PackagedProgram) result);
+			InfoOptions options = CliFrontendParser.parseInfoCommand(arguments);
+			assertEquals(getTestJarPath(), options.getJarFilePath());
+			assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, options.getEntryPointClassName());
+			assertArrayEquals(progArgs, options.getProgramArgs());
+			
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+			PackagedProgram prog = spy(frontend.buildProgram(options));
 
 			ClassLoader testClassLoader = new ClassLoader(prog.getUserCodeClassLoader()) {
 				@Override
@@ -272,8 +291,8 @@ public class CliFrontendPackageProgramTest {
 			};
 			when(prog.getUserCodeClassLoader()).thenReturn(testClassLoader);
 
-			Assert.assertArrayEquals(new String[]{"--verbose", "true", "arg1", "arg2"}, prog.getArguments());
-			Assert.assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, prog.getMainClassName());
+			assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, prog.getMainClassName());
+			assertArrayEquals(progArgs, prog.getArguments());
 
 			Configuration c = new Configuration();
 			c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "devil");
@@ -281,7 +300,7 @@ public class CliFrontendPackageProgramTest {
 			
 			cli.getOptimizedPlanAsJson(prog, 666);
 		}
-		catch(ProgramInvocationException pie) {
+		catch (ProgramInvocationException pie) {
 			assertTrue("Classloader was not called", callme[0]);
 			// class not found exception is expected as some point
 			if( ! ( pie.getCause() instanceof ClassNotFoundException ) ) {
@@ -296,7 +315,5 @@ public class CliFrontendPackageProgramTest {
 			assertTrue("Classloader was not called", callme[0]);
 			fail("Program caused an exception: " + e.getMessage());
 		}
-		
-		
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index dc770ff..8ed99f9 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -42,7 +42,7 @@ public class CliFrontendRunTest {
 			// test unrecognized option
 			{
 				String[] parameters = {"-v", "-l", "-a", "some", "program", "arguments"};
-				TestingCliFrontend testFrontend = new TestingCliFrontend();
+				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 				int retCode = testFrontend.run(parameters);
 				assertTrue(retCode != 0);
 			}
@@ -64,31 +64,31 @@ public class CliFrontendRunTest {
 			// test configure parallelism with non integer value
 			{
 				String[] parameters = {"-v", "-p", "text",  getTestJarPath()};
-				TestingCliFrontend testFrontend = new TestingCliFrontend();
+				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 				assertTrue(0 != testFrontend.run(parameters));
 			}
 			
 			// test configure parallelism with overflow integer value
 			{
 				String[] parameters = {"-v", "-p", "475871387138",  getTestJarPath()};
-				TestingCliFrontend testFrontend = new TestingCliFrontend();
+				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 				assertTrue(0 != testFrontend.run(parameters));
 			}
 		}
 		catch (Exception e) {
-			System.err.println(e.getMessage());
 			e.printStackTrace();
-			fail("Program caused an exception: " + e.getMessage());
+			fail(e.getMessage());
 		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public static final class RunTestingCliFrontend extends TestingCliFrontend {
+	public static final class RunTestingCliFrontend extends CliFrontend {
 		
 		private final int expectedParallelim;
 		
-		public RunTestingCliFrontend(int expectedParallelim) {
+		public RunTestingCliFrontend(int expectedParallelim) throws Exception {
+			super(CliFrontendTestUtils.getConfigDir());
 			this.expectedParallelim = expectedParallelim;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 95f6cb8..7d01ab6 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -81,6 +81,7 @@ public class CliFrontendTestUtils {
 	
 	public static void pipeSystemOutToNull() {
 		System.setOut(new PrintStream(new BlackholeOutputSteam()));
+		System.setErr(new PrintStream(new BlackholeOutputSteam()));
 	}
 	
 	public static void clearGlobalConfiguration() {
@@ -109,20 +110,6 @@ public class CliFrontendTestUtils {
 		
 	}
 	
-	// --------------------------------------------------------------------------------------------
-	
-	public static class TestingCliFrontend extends CliFrontend {
-		
-
-		public TestingCliFrontend() {
-			this(getConfigDir());
-		}
-		
-		public TestingCliFrontend(String configDir) {
-			this.configurationDirectory = configDir;
-		}
-	}
-	
 	private static final class BlackholeOutputSteam extends java.io.OutputStream {
 		@Override
 		public void write(int b){}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
index 53129bb..e2b7935 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanAfterExecutionTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.client.CliFrontendTestUtils;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -30,6 +32,11 @@ import static org.junit.Assert.*;
 @SuppressWarnings("serial")
 public class ExecutionPlanAfterExecutionTest implements java.io.Serializable {
 
+	@BeforeClass
+	public static void suppressOutput() {
+		CliFrontendTestUtils.pipeSystemOutToNull();
+	}
+
 	@Test
 	public void testExecuteAfterGetExecutionPlan() {
 		ExecutionEnvironment env = new LocalEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
index d421333..624219b 100644
--- a/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
+++ b/flink-dist/src/main/flink-bin/conf/log4j-cli.properties
@@ -30,6 +30,7 @@ log4j.appender.file.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m
 # CliFrontend class when using a per-job YARN cluster.
 log4j.logger.org.apache.flink.yarn=INFO, console
 log4j.logger.org.apache.hadoop=INFO, console
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
new file mode 100644
index 0000000..d2c81a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobStatusMessage.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+/**
+ * A simple message that holds the state of a job execution.
+ */
+public class JobStatusMessage implements java.io.Serializable {
+
+	private final JobID jobId;
+
+	private final String jobName;
+
+	private final JobStatus jobState;
+
+	private final long startTime;
+
+	public JobStatusMessage(JobID jobId, String jobName, JobStatus jobState, long startTime) {
+		this.jobId = jobId;
+		this.jobName = jobName;
+		this.jobState = jobState;
+		this.startTime = startTime;
+	}
+
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	public String getJobName() {
+		return jobName;
+	}
+
+	public JobStatus getJobState() {
+		return jobState;
+	}
+
+	public long getStartTime() {
+		return startTime;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index b702fdc..54d3cf2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -25,7 +25,7 @@ import akka.actor.Status.{Success, Failure}
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
 import org.apache.flink.core.io.InputSplitAssigner
 import org.apache.flink.runtime.blob.BlobServer
-import org.apache.flink.runtime.client.{JobSubmissionException, JobExecutionException, JobCancellationException}
+import org.apache.flink.runtime.client.{JobStatusMessage, JobSubmissionException, JobExecutionException, JobCancellationException}
 import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
@@ -350,6 +350,19 @@ class JobManager(val configuration: Configuration,
 
       sender ! RunningJobs(executionGraphs)
 
+    case RequestRunningJobsStatus =>
+      try {
+        val jobs = currentJobs map {
+          case (_, (eg, _)) => new JobStatusMessage(eg.getJobID, eg.getJobName,
+                                            eg.getState, eg.getStatusTimestamp(JobStatus.CREATED))
+        }
+
+        sender ! RunningJobsStatus(jobs)
+      }
+      catch {
+        case t: Throwable => LOG.error("Exception while responding to RequestRunningJobsStatus", t)
+      }
+
     case RequestJob(jobID) =>
       currentJobs.get(jobID) match {
         case Some((eg, _)) => sender ! JobFound(jobID, eg)
@@ -758,9 +771,9 @@ object JobManager {
   def parseArgs(args: Array[String]): (Configuration, ExecutionMode, String, Int) = {
     val parser = new scopt.OptionParser[JobManagerCLIConfiguration]("JobManager") {
       head("Flink JobManager")
-      opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text (
-        "The configuration directory.")
-      opt[String]("executionMode") optional() action { (arg, c) =>
+      opt[String]("configDir") action { (arg, c) => c.copy(configDir = arg) } text {
+        "The configuration directory." }
+      opt[String]("executionMode") action { (arg, c) =>
         if (arg.equalsIgnoreCase("local")){
           c.copy(executionMode = LOCAL)
         } else if (arg.equalsIgnoreCase("cluster")) {
@@ -775,6 +788,14 @@ object JobManager {
 
     parser.parse(args, JobManagerCLIConfiguration()) map {
       config =>
+
+        if (config.configDir == null) {
+          throw new Exception("Missing parameter '--configDir'")
+        }
+        if (config.executionMode == null) {
+          throw new Exception("Missing parameter '--executionMode'")
+        }
+
         LOG.info("Loading configuration from " + config.configDir)
         GlobalConfiguration.loadConfiguration(config.configDir)
         val configuration = GlobalConfiguration.getConfiguration

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
index a932977..905dbe3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManagerCLIConfiguration.scala
@@ -25,4 +25,4 @@ object ExecutionMode extends Enumeration{
 }
 
 case class JobManagerCLIConfiguration(configDir: String = null, 
-          executionMode: ExecutionMode.ExecutionMode = ExecutionMode.CLUSTER) {}
+          executionMode: ExecutionMode.ExecutionMode = null) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 28c5bea..4630089 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -19,11 +19,14 @@
 package org.apache.flink.runtime.messages
 
 import org.apache.flink.runtime.accumulators.AccumulatorEvent
+import org.apache.flink.runtime.client.JobStatusMessage
 import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.{InstanceID, Instance}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobID, JobStatus, JobVertexID}
 import org.apache.flink.runtime.taskmanager.TaskExecutionState
 
+import scala.collection.JavaConverters._
+
 /**
  * The job manager specific actor messages
  */
@@ -202,21 +205,31 @@ object JobManagerMessages {
   /**
    * This message is the response to the [[RequestRunningJobs]] message. It contains all
    * execution graphs of the currently running jobs.
-   *
-   * @param runningJobs
    */
   case class RunningJobs(runningJobs: Iterable[ExecutionGraph]) {
     def this() = this(Seq())
     def asJavaIterable: java.lang.Iterable[ExecutionGraph] = {
-      import scala.collection.JavaConverters._
       runningJobs.asJava
     }
   }
 
   /**
-   * Requests the execution graph of a specific job identified by [[jobID]]. The result is sent
-   * back to the sender as a [[JobResponse]].
-   * @param jobID
+   * Requests the status of all currently running jobs from the job manager.
+   * This message triggers a [[RunningJobsStatus]] response.
+   */
+  case object RequestRunningJobsStatus
+
+  case class RunningJobsStatus(runningJobs: Iterable[JobStatusMessage]) {
+    def this() = this(Seq())
+
+    def getStatusMessages(): java.util.List[JobStatusMessage] = {
+      new java.util.ArrayList[JobStatusMessage](runningJobs.asJavaCollection)
+    }
+  }
+
+  /**
+   * Requests the execution graph of a specific job identified by [[jobID]].
+   * The result is sent back to the sender as a [[JobResponse]].
    */
   case class RequestJob(jobID: JobID)
 
@@ -283,7 +296,7 @@ object JobManagerMessages {
 
   case object JobManagerStatusAlive extends JobManagerStatus
 
-    // --------------------------------------------------------------------------
+  // --------------------------------------------------------------------------
   // Utility methods to allow simpler case object access from Java
   // --------------------------------------------------------------------------
   
@@ -302,7 +315,11 @@ object JobManagerMessages {
   def getRequestRunningJobs : AnyRef = {
     RequestRunningJobs
   }
-  
+
+  def getRequestRunningJobsStatus : AnyRef = {
+    RequestRunningJobsStatus
+  }
+
   def getRequestRegisteredTaskManagers : AnyRef = {
     RequestRegisteredTaskManagers
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 2dacf57..65517d3 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -94,6 +94,7 @@ public abstract class YarnTestBase {
 
 	// This code is taken from: http://stackoverflow.com/a/7201825/568695
 	// it changes the environment variables of this JVM. Use only for testing purposes!
+	@SuppressWarnings("unchecked")
 	private static void setEnv(Map<String, String> newenv) {
 		try {
 			Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment");
@@ -400,8 +401,12 @@ public abstract class YarnTestBase {
 					returnValue = yCli.run(args);
 					break;
 				case CLI_FRONTEND:
-					CliFrontend cli = new CliFrontend();
-					returnValue = cli.parseParameters(args);
+					try {
+						CliFrontend cli = new CliFrontend();
+						returnValue = cli.parseParameters(args);
+					} catch (Exception e) {
+						throw new RuntimeException(e);
+					}
 					break;
 				default:
 					throw new RuntimeException("Unknown type " + type);

http://git-wip-us.apache.org/repos/asf/flink/blob/5385e48d/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 4bc6014..966be7e 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -18,12 +18,12 @@
 
 package org.apache.flink.yarn
 
-import java.io.{PrintWriter, FileWriter, BufferedWriter}
+import java.io.{File, PrintWriter, FileWriter, BufferedWriter}
 import java.security.PrivilegedAction
 
 import akka.actor._
 import org.apache.flink.client.CliFrontend
-import org.apache.flink.configuration.{Configuration, ConfigConstants}
+import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
@@ -194,10 +194,12 @@ object ApplicationMaster {
     (Configuration, ActorSystem, ActorRef, ActorRef) = {
 
     LOG.info("Starting JobManager for YARN")
-    val args = Array[String]("--configDir", currDir)
+    LOG.info("Loading config from: {}", currDir)
 
-    LOG.info(s"Config path: $currDir.")
-    val (configuration, _, _, _) = JobManager.parseArgs(args)
+    GlobalConfiguration.loadConfiguration(currDir)
+    val configuration = GlobalConfiguration.getConfiguration()
+
+    configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, currDir)
 
     // add dynamic properties to JobManager configuration.
     val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)


Mime
View raw message