flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/4] flink git commit: [FLINK-2097][core] implement a job session management
Date Tue, 22 Sep 2015 19:57:18 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 70a49fd..889ce43 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -178,6 +178,9 @@ public class ExecutionGraph implements Serializable {
 
 	/** Flag that indicate whether the executed dataflow should be periodically snapshotted */
 	private boolean snapshotCheckpointsEnabled;
+
+	/** Flag to indicate whether the Graph has been archived */
+	private boolean isArchived = false;
 		
 
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
@@ -326,6 +329,9 @@ public class ExecutionGraph implements Serializable {
 		return scheduleMode;
 	}
 
+	public boolean isArchived() {
+		return isArchived;
+	}
 	public void enableSnapshotCheckpointing(
 			long interval,
 			long checkpointTimeout,
@@ -779,6 +785,8 @@ public class ExecutionGraph implements Serializable {
 		requiredJarFiles.clear();
 		jobStatusListenerActors.clear();
 		executionListenerActors.clear();
+
+		isArchived = true;
 	}
 
 	public ExecutionConfig getExecutionConfig() {

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 7677a1b..e4a0209 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -70,15 +70,19 @@ public class JobGraph implements Serializable {
 
 	/** Set of blob keys identifying the JAR files required to run this job. */
 	private final List<BlobKey> userJarBlobKeys = new ArrayList<BlobKey>();
-	
-	/** ID of this job. */
+
+	/** ID of this job. May be set if specific job id is desired (e.g. session management) */
 	private final JobID jobID;
 
 	/** Name of this job. */
-	private String jobName;
+	private final String jobName;
 	
 	/** The number of times that failed tasks should be re-executed */
 	private int numExecutionRetries;
+
+	/** The number of seconds after which the corresponding ExecutionGraph is removed at the
+	 * job manager after it has been executed. */
+	private long sessionTimeout = 0;
 	
 	/** flag to enable queued scheduling */
 	private boolean allowQueuedScheduling;
@@ -86,7 +90,7 @@ public class JobGraph implements Serializable {
 	/** The mode in which the job is scheduled */
 	private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 	
-	/** The settings for asynchronous snapshotting */
+	/** The settings for asynchronous snapshots */
 	private JobSnapshottingSettings snapshotSettings;
 
 	// --------------------------------------------------------------------------------------------
@@ -99,19 +103,19 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Constructs a new job graph with the given name and a random job ID.
+	 * Constructs a new job graph with the given name, a random job ID.
 	 * 
 	 * @param jobName The name of the job
 	 */
 	public JobGraph(String jobName) {
 		this(null, jobName);
 	}
-	
+
 	/**
-	 * Constructs a new job graph with the given name and a random job ID.
+	 * Constructs a new job graph with the given name and a random job ID if null supplied as an id.
 	 * 
-	 * @param jobId The id of the job
-	 * @param jobName The name of the job
+	 * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
+	 * @param jobName The name of the job.
 	 */
 	public JobGraph(JobID jobId, String jobName) {
 		this.jobID = jobId == null ? new JobID() : jobId;
@@ -119,7 +123,7 @@ public class JobGraph implements Serializable {
 	}
 	
 	/**
-	 * Constructs a new job graph with no name and a random job ID.
+	 * Constructs a new job graph with no name and a random job ID if null supplied as an id.
 	 * 
 	 * @param vertices The vertices to add to the graph.
 	 */
@@ -138,9 +142,9 @@ public class JobGraph implements Serializable {
 	}
 	
 	/**
-	 * Constructs a new job graph with the given name and a random job ID.
+	 * Constructs a new job graph with the given name and a random job ID if null supplied as an id.
 	 * 
-	 * @param jobId The id of the job.
+	 * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
 	 * @param jobName The name of the job.
 	 * @param vertices The vertices to add to the graph.
 	 */
@@ -162,7 +166,7 @@ public class JobGraph implements Serializable {
 	public JobID getJobID() {
 		return this.jobID;
 	}
-	
+
 	/**
 	 * Returns the name assigned to the job graph.
 	 * 
@@ -173,9 +177,10 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
-	 * Returns the configuration object for this job if it is set.
+	 * Returns the configuration object for this job. Job-wide parameters should be set into that
+	 * configuration object.
 	 * 
-	 * @return the configuration object for this job, or <code>null</code> if it is not set
+	 * @return The configuration object for this job.
 	 */
 	public Configuration getJobConfiguration() {
 		return this.jobConfiguration;
@@ -190,7 +195,8 @@ public class JobGraph implements Serializable {
 	 */
 	public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
 		if (numberOfExecutionRetries < -1) {
-			throw new IllegalArgumentException("The number of execution retries must be non-negative, or -1 (use system default)");
+			throw new IllegalArgumentException(
+					"The number of execution retries must be non-negative, or -1 (use system default)");
 		}
 		this.numExecutionRetries = numberOfExecutionRetries;
 	}
@@ -205,11 +211,29 @@ public class JobGraph implements Serializable {
 	public int getNumberOfExecutionRetries() {
 		return numExecutionRetries;
 	}
+
+	/**
+	 * Gets the timeout after which the corresponding ExecutionGraph is removed at the
+	 * job manager after it has been executed.
+	 * @return a timeout as a long in seconds.
+	 */
+	public long getSessionTimeout() {
+		return sessionTimeout;
+	}
+
+	/**
+	 * Sets the timeout of the session in seconds. The timeout specifies how long a job will be kept
+	 * in the job manager after it finishes.
+	 * @param sessionTimeout The timeout in seconds
+	 */
+	public void setSessionTimeout(long sessionTimeout) {
+		this.sessionTimeout = sessionTimeout;
+	}
 	
 	public void setAllowQueuedScheduling(boolean allowQueuedScheduling) {
 		this.allowQueuedScheduling = allowQueuedScheduling;
 	}
-	
+
 	public boolean getAllowQueuedScheduling() {
 		return allowQueuedScheduling;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index c7ea06e..60aadf5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -187,7 +187,7 @@ public class TaskExecutionState implements java.io.Serializable {
 	
 	@Override
 	public String toString() {
-		return String.format("TaskState jobId=%s, executionId=%s, state=%s, error=%s", 
+		return String.format("TaskState jobId=%s, jobID=%s, state=%s, error=%s",
 				jobID, executionId, executionState,
 				throwable == null ? "(null)" : throwable.toString());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
index 26d7272..75ad20f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import akka.actor.ActorRef
 
+
 /**
  * Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor
  * submitted the job, when the start time and, if already terminated, the end time was.
@@ -29,7 +30,15 @@ import akka.actor.ActorRef
  * @param client Actor which submitted the job
  * @param start Starting time
  */
-class JobInfo(val client: ActorRef, val start: Long){
+class JobInfo(val client: ActorRef, val start: Long,
+              val sessionTimeout: Long) {
+
+  var sessionAlive = sessionTimeout > 0
+
+  var lastActive = 0L
+
+  setLastActive()
+
   var end: Long = -1
 
   def duration: Long = {
@@ -39,8 +48,13 @@ class JobInfo(val client: ActorRef, val start: Long){
       -1
     }
   }
+
+  def setLastActive() =
+    lastActive = System.currentTimeMillis()
 }
 
 object JobInfo{
-  def apply(client: ActorRef, start: Long) = new JobInfo(client, start)
+  def apply(client: ActorRef, start: Long,
+            sessionTimeout: Long) =
+    new JobInfo(client, start, sessionTimeout)
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d93b2ed..444ab0b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -74,6 +74,8 @@ import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 import scala.collection.JavaConverters._
+import scala.concurrent.ExecutionContext.Implicits.global
+
 
 /**
  * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the
@@ -121,7 +123,7 @@ class JobManager(
 
   override val log = Logger(getClass)
 
-  /** List of current jobs running jobs */
+  /** Either running or not yet archived jobs (session hasn't been ended). */
   protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
   var leaderSessionID: Option[UUID] = None
@@ -426,7 +428,19 @@ class JobManager(
               }
             }
 
-            removeJob(jobID)
+
+            if (jobInfo.sessionAlive) {
+              jobInfo.setLastActive()
+              val lastActivity = jobInfo.lastActive
+              context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout seconds) {
+                // remove only if no activity occurred in the meantime
+                if (lastActivity == jobInfo.lastActive) {
+                  removeJob(jobID)
+                }
+              }
+            } else {
+              removeJob(jobID)
+            }
 
           }
         case None =>
@@ -555,6 +569,18 @@ class JobManager(
     case RequestJobManagerStatus =>
       sender() ! decorateMessage(JobManagerStatusAlive)
 
+    case RemoveCachedJob(jobID) =>
+      currentJobs.get(jobID) match {
+        case Some((graph, info)) =>
+          if (graph.getState.isTerminalState) {
+            removeJob(graph.getJobID)
+          } else {
+            // triggers removal upon completion of job
+            info.sessionAlive = false
+          }
+        case None =>
+      }
+
     case Disconnect(msg) =>
       val taskManager = sender()
 
@@ -624,19 +650,26 @@ class JobManager(
         }
 
         // see if there already exists an ExecutionGraph for the corresponding job ID
-        executionGraph = currentJobs.getOrElseUpdate(
-          jobGraph.getJobID,
-          (new ExecutionGraph(
-            executionContext,
-            jobGraph.getJobID,
-            jobGraph.getName,
-            jobGraph.getJobConfiguration(),
-            timeout,
-            jobGraph.getUserJarBlobKeys(),
-            userCodeLoader),
-            JobInfo(client, System.currentTimeMillis())
-          )
-        )._1
+        executionGraph = currentJobs.get(jobGraph.getJobID) match {
+          case Some((graph, jobInfo)) =>
+            jobInfo.setLastActive()
+            graph
+          case None =>
+            val graph = new ExecutionGraph(
+              executionContext,
+              jobGraph.getJobID,
+              jobGraph.getName,
+              jobGraph.getJobConfiguration,
+              timeout,
+              jobGraph.getUserJarBlobKeys,
+              userCodeLoader)
+            val jobInfo = JobInfo(
+              client,
+              System.currentTimeMillis(),
+              jobGraph.getSessionTimeout)
+            currentJobs.put(jobGraph.getJobID, (graph, jobInfo))
+            graph
+        }
 
         // configure the execution graph
         val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries() >= 0) {
@@ -990,25 +1023,26 @@ class JobManager(
    * @param jobID ID of the job to remove and archive
    */
   private def removeJob(jobID: JobID): Unit = {
-    currentJobs.remove(jobID) match {
-      case Some((eg, _)) =>
-        try {
-          eg.prepareForArchiving()
-
-          archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
-        } catch {
-          case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
-            "archiving.", t)
-        }
+    currentJobs.synchronized {
+      currentJobs.remove(jobID) match {
+        case Some((eg, _)) =>
+          try {
+            eg.prepareForArchiving()
+            archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
+          } catch {
+            case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
+              "archiving.", t)
+          }
 
-      case None =>
-    }
+        case None =>
+      }
 
-    try {
-      libraryCacheManager.unregisterJob(jobID)
-    } catch {
-      case t: Throwable =>
-        log.error(s"Could not properly unregister job $jobID form the library cache.", t)
+      try {
+        libraryCacheManager.unregisterJob(jobID)
+      } catch {
+        case t: Throwable =>
+          log.error(s"Could not properly unregister job $jobID form the library cache.", t)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index d3fc8b1..bef52e0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -80,10 +80,13 @@ class MemoryArchivist(private val max_entries: Int)
   override def handleMessage: Receive = {
     
     /* Receive Execution Graph to archive */
-    case ArchiveExecutionGraph(jobID, graph) => 
-      // wrap graph inside a soft reference
-      graphs.update(jobID, graph)
-
+    case ArchiveExecutionGraph(jobID, graph) =>
+      // Keep lru order in case we override a graph (from multiple job submission in one session).
+      // This deletes old ExecutionGraph with this JobID from the history but avoids to store
+      // redundant ExecutionGraphs.
+      // TODO Allow ExecutionGraphs with the same jobID to be stored and displayed in web interface
+      graphs.remove(jobID)
+      graphs.put(jobID, graph)
       // update job counters
       graph.getState match {
         case JobStatus.FINISHED => finishedCnt += 1

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index d7bbb8d..3869392 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -275,6 +275,12 @@ object JobManagerMessages {
   case class JobNotFound(jobID: JobID) extends JobResponse with JobStatusResponse
 
   /**
+   * Removes the job belonging to the job identifier from the job manager and archives it.
+   * @param jobID The job identifier
+   */
+  case class RemoveCachedJob(jobID: JobID)
+
+  /**
    * Requests the instances of all registered task managers.
    */
   case object RequestRegisteredTaskManagers

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index bbd011a..839193b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -416,8 +416,8 @@ abstract class FlinkMiniCluster(
        jobManagerGateway,
        jobGraph,
        timeout,
-      printUpdates,
-      this.getClass().getClassLoader())
+       printUpdates,
+       this.getClass.getClassLoader())
     } finally {
        if(!useSingleActorSystem) {
          // we have to shutdown the just created actor system
@@ -440,7 +440,10 @@ abstract class FlinkMiniCluster(
         )
     }
 
-    JobClient.submitJobDetached(jobManagerGateway, jobGraph, timeout, getClass().getClassLoader())
+    JobClient.submitJobDetached(jobManagerGateway,
+      jobGraph,
+      timeout,
+      this.getClass.getClassLoader())
 
     new JobSubmissionResult(jobGraph.getJobID)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index fdbffaa..5753cde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -107,7 +107,8 @@ public class PartialConsumePipelinedResultTest {
 				flink.getLeaderGateway(TestingUtils.TESTING_DURATION()),
 				jobGraph,
 				TestingUtils.TESTING_DURATION(),
-				false, this.getClass().getClassLoader());
+				false,
+				this.getClass().getClassLoader());
 	}
 
 	// ---------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
index 5594bfe..075c1c2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTest.java
@@ -52,7 +52,7 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandP
 import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
 
 /**
- * Tests that the JobManager process properly exits when the JobManager actor dies.
+ * Tests that the TaskManager process properly exits when the TaskManager actor dies.
  */
 public class TaskManagerProcessReapingTest {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 74b7680..3a252f8 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager
 
+
 import Tasks._
 import akka.actor.ActorSystem
 import akka.actor.Status.{Success, Failure}
@@ -27,13 +28,13 @@ import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{JobVertex, DistributionPattern, JobGraph, ScheduleMode}
 import org.apache.flink.runtime.messages.JobManagerMessages._
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved
-import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
-import org.apache.flink.runtime.util.SerializedThrowable
+import org.apache.flink.runtime.testingUtils.{TestingUtils, ScalaTestingUtils}
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
+import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
+
 import org.junit.runner.RunWith
+import org.scalatest.{Matchers, BeforeAndAfterAll, WordSpecLike}
 import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
-import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
@@ -617,6 +618,121 @@ class JobManagerITCase(_system: ActorSystem)
         cluster.stop()
       }
     }
+
+    "remove execution graphs when the client ends the session explicitly" in {
+      val vertex = new JobVertex("Test Vertex")
+      vertex.setInvokableClass(classOf[NoOpInvokable])
+
+      val jobGraph1 = new JobGraph("Test Job", vertex)
+
+      val slowVertex = new WaitingOnFinalizeJobVertex("Long running Vertex", 2000)
+      slowVertex.setInvokableClass(classOf[NoOpInvokable])
+
+      val jobGraph2 = new JobGraph("Long running Job", slowVertex)
+
+      val cluster = TestingUtils.startTestingCluster(1)
+      val jm = cluster.getLeaderGateway(1 seconds)
+
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          /* jobgraph1 is removed after being terminated */
+          jobGraph1.setSessionTimeout(9999)
+          jm.tell(SubmitJob(jobGraph1, ListeningBehaviour.EXECUTION_RESULT), self)
+          expectMsg(JobSubmitSuccess(jobGraph1.getJobID))
+          expectMsgType[JobResultSuccess]
+
+          // should not be archived yet
+          jm.tell(RequestExecutionGraph(jobGraph1.getJobID), self)
+          var cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+          assert(!cachedGraph.isArchived)
+
+          jm.tell(RemoveCachedJob(jobGraph1.getJobID), self)
+
+          jm.tell(RequestExecutionGraph(jobGraph1.getJobID), self)
+          cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+          assert(cachedGraph.isArchived)
+
+          /* jobgraph2 is removed while running */
+          jobGraph2.setSessionTimeout(9999)
+          jm.tell(SubmitJob(jobGraph2, ListeningBehaviour.EXECUTION_RESULT), self)
+          expectMsg(JobSubmitSuccess(jobGraph2.getJobID))
+
+          // job stil running
+          jm.tell(RemoveCachedJob(jobGraph2.getJobID), self)
+
+          expectMsgType[JobResultSuccess]
+
+          // should be archived!
+          jm.tell(RequestExecutionGraph(jobGraph2.getJobID), self)
+          cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+          assert(cachedGraph.isArchived)
+        }
+      } finally {
+        cluster.stop()
+      }
+    }
+
+    "remove execution graphs when when the client's session times out" in {
+      val vertex = new JobVertex("Test Vertex")
+      vertex.setParallelism(1)
+      vertex.setInvokableClass(classOf[NoOpInvokable])
+
+      val jobGraph = new JobGraph("Test Job", vertex)
+
+      val cluster = TestingUtils.startTestingCluster(1)
+      val jm = cluster.getLeaderGateway(1 seconds)
+
+      try {
+        within(TestingUtils.TESTING_DURATION) {
+          // try multiple times in case of flaky environments
+          var testSucceeded = false
+          var numTries = 0
+          while(!testSucceeded && numTries < 10) {
+            try {
+              // should be removed immediately
+              jobGraph.setSessionTimeout(0)
+              jm.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
+              expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+              expectMsgType[JobResultSuccess]
+
+              jm.tell(RequestExecutionGraph(jobGraph.getJobID), self)
+              val cachedGraph2 = expectMsgType[ExecutionGraphFound].executionGraph
+              assert(cachedGraph2.isArchived)
+
+              // removed after 2 seconds
+              jobGraph.setSessionTimeout(2)
+
+              jm.tell(SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), self)
+              expectMsg(JobSubmitSuccess(jobGraph.getJobID))
+              expectMsgType[JobResultSuccess]
+
+              // should not be archived yet
+              jm.tell(RequestExecutionGraph(jobGraph.getJobID), self)
+              val cachedGraph = expectMsgType[ExecutionGraphFound].executionGraph
+              assert(!cachedGraph.isArchived)
+
+              // wait until graph is archived
+              Thread.sleep(3000)
+
+              jm.tell(RequestExecutionGraph(jobGraph.getJobID), self)
+              val graph = expectMsgType[ExecutionGraphFound].executionGraph
+              assert(graph.isArchived)
+
+              testSucceeded = true
+            } catch {
+              case e: Throwable =>
+                numTries += 1
+            }
+          }
+          if(!testSucceeded) {
+            fail("Test case failed after " + numTries + " probes.")
+          }
+        }
+      } finally {
+        cluster.stop()
+      }
+    }
+
   }
 
   class WaitingOnFinalizeJobVertex(name: String, val waitingTime: Long) extends JobVertex(name){

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 5265134..745b0d3 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -22,7 +22,7 @@ import java.util.UUID
 import com.esotericsoftware.kryo.Serializer
 import org.apache.flink.api.common.io.{FileInputFormat, InputFormat}
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult}
+import org.apache.flink.api.common.{JobID, ExecutionConfig, JobExecutionResult}
 import org.apache.flink.api.java.io._
 import org.apache.flink.api.java.operators.DataSource
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
@@ -131,7 +131,7 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
    * Gets the UUID by which this environment is identified. The UUID sets the execution context
    * in the cluster or local environment.
    */
-  def getId: UUID = {
+  def getId: JobID = {
     javaEnv.getId
   }
 
@@ -148,6 +148,33 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Starts a new session, discarding all intermediate results.
+   */
+  def startNewSession() {
+    javaEnv.startNewSession()
+  }
+
+  /**
+   * Sets the session timeout to hold the intermediate results of a job. This only
+   * applies the updated timeout in future executions.
+   * @param timeout The timeout in seconds.
+   */
+  def setSessionTimeout(timeout: Long) {
+    javaEnv.setSessionTimeout(timeout)
+  }
+
+  /**
+   * Gets the session timeout for this environment. The session timeout defines for how long
+   * after an execution, the job and its intermediate results will be kept for future
+   * interactions.
+   *
+   * @return The session timeout, in seconds.
+   */
+  def getSessionTimeout: Long = {
+    javaEnv.getSessionTimeout
+  }
+
+  /**
    * Registers the given type with the serializer at the [[KryoSerializer]].
    *
    * Note that the serializer instance must be serializable (as defined by java.io.Serializable),

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index f691806..e2d91af 100644
--- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -19,12 +19,21 @@
 package org.apache.flink.api.avro;
 
 import java.io.File;
+import java.net.InetAddress;
 
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.client.CliFrontend;
+import org.apache.flink.client.RemoteExecutor;
 import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -32,35 +41,34 @@ import org.junit.Test;
 public class AvroExternalJarProgramITCase {
 
 	private static final String JAR_FILE = "target/maven-test-jar.jar";
-	
+
 	private static final String TEST_DATA_FILE = "/testdata.avro";
 
 	@Test
 	public void testExternalProgram() {
-		
+
 		ForkableFlinkMiniCluster testMiniCluster = null;
-		
+
 		try {
 			Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
 			testMiniCluster = new ForkableFlinkMiniCluster(config, false);
 			testMiniCluster.start();
-			
+
 			String jarFile = JAR_FILE;
 			String testData = getClass().getResource(TEST_DATA_FILE).toString();
-			
+
 			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });
 
+
 			config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 			config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, testMiniCluster.getLeaderRPCPort());
-						
-			Client c = new Client(
-					config,
-					program.getUserCodeClassLoader(),
-					-1);
-
-			c.setPrintStatusDuringExecution(false);
-			c.run(program, 4, true);
+
+			Client client = new Client(config);
+
+			client.setPrintStatusDuringExecution(false);
+			client.runBlocking(program, 4);
+
 		}
 		catch (Throwable t) {
 			System.err.println(t.getMessage());
@@ -71,7 +79,9 @@ public class AvroExternalJarProgramITCase {
 			if (testMiniCluster != null) {
 				try {
 					testMiniCluster.stop();
-				} catch (Throwable t) {}
+				} catch (Throwable t) {
+					// ignore
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index ae2c047..29439f6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -31,10 +30,12 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
 
 	private final String host;
@@ -117,17 +118,17 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host);
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port);
 
+		Client client;
 		try {
-			Client client = new Client(configuration, usercodeClassLoader, -1);
+			client = new Client(configuration);
 			client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
-			
-			JobSubmissionResult result = client.run(jobGraph, true);
-			if (result instanceof JobExecutionResult) {
-				return (JobExecutionResult) result;
-			} else {
-				LOG.warn("The Client didn't return a JobExecutionResult");
-				return new JobExecutionResult(result.getJobID(), -1, null);
-			}
+		}
+		catch (Exception e) {
+			throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
+		}
+
+		try {
+			return client.runBlocking(jobGraph, usercodeClassLoader);
 		}
 		catch (ProgramInvocationException e) {
 			throw e;
@@ -136,6 +137,9 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 			String term = e.getMessage() == null ? "." : (": " + e.getMessage());
 			throw new ProgramInvocationException("The program execution failed" + term, e);
 		}
+		finally {
+			client.shutdown();
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index c2335d6..2b2a426 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -23,10 +23,12 @@ import java.util.List;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,17 +36,25 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamContextEnvironment.class);
 
-	protected List<File> jars;
-	protected Client client;
+	private final List<File> jars;
+	
+	private final Client client;
+
+	private final ClassLoader userCodeClassLoader;
+	
 	private final boolean wait;
 
 	protected StreamContextEnvironment(Client client, List<File> jars, int parallelism, boolean wait) {
 		this.client = client;
 		this.jars = jars;
 		this.wait = wait;
+		
+		this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(jars, getClass().getClassLoader());
+		
 		if (parallelism > 0) {
 			setParallelism(parallelism);
-		} else {
+		}
+		else {
 			// first check for old parallelism config key
 			setParallelism(GlobalConfiguration.getInteger(
 					ConfigConstants.DEFAULT_PARALLELISM_KEY_OLD,
@@ -73,16 +83,18 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 		transformations.clear();
 
+		// attach all necessary jar files to the JobGraph
 		for (File file : jars) {
 			jobGraph.addJar(new Path(file.getAbsolutePath()));
 		}
 
-		JobSubmissionResult result = client.run(jobGraph, wait);
-		if(result instanceof JobExecutionResult) {
-			return (JobExecutionResult) result;
+		// execute the programs
+		if (wait) {
+			return client.runBlocking(jobGraph, userCodeClassLoader);
 		} else {
-			LOG.warn("The Client didn't return a JobExecutionResult");
-			return new JobExecutionResult(result.getJobID(), -1, null);
+			JobSubmissionResult result = client.runDetached(jobGraph, userCodeClassLoader);
+			LOG.warn("Job was executed in detached mode, the results will be available on completion.");
+			return JobExecutionResult.fromJobSubmissionResult(result);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ff785c4..c50f23e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.InputFormat;
@@ -40,9 +41,9 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.FileStateHandle;
@@ -1282,11 +1283,11 @@ public abstract class StreamExecutionEnvironment {
 	 *
 	 * @param jobName
 	 * 		Desired name of the job
-	 * @return The result of the job execution, containing elapsed time and
-	 * accumulators.
+	 * @return The result of the job execution: Either JobSubmissionResult or JobExecutionResult;
+	 * The latter contains elapsed time and accumulators.
 	 * @throws Exception
 	 */
-	public abstract JobExecutionResult execute(String jobName) throws Exception;
+	public abstract JobSubmissionResult execute(String jobName) throws Exception;
 
 	/**
 	 * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 8c1408e..e5ea2c5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -19,9 +19,8 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -68,6 +67,6 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 			((PreviewPlanEnvironment) env).setPreview(streamGraph.getStreamingPlanAsJSON());
 		}
 
-		throw new Client.ProgramAbortException();
+		throw new OptimizerPlanEnvironment.ProgramAbortException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
index 6d1b1c7..4c091e5 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
@@ -68,4 +68,9 @@ public class LocalTezEnvironment extends ExecutionEnvironment {
 		};
 		initializeContextEnvironment(factory);
 	}
+
+	@Override
+	public void startNewSession() throws Exception {
+		throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
index b155527..a02b536 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
@@ -75,4 +75,9 @@ public class RemoteTezEnvironment extends ExecutionEnvironment {
 		compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new org.apache.flink.configuration.Configuration());
 		executor = new TezExecutor(compiler, this.getDegreeOfParallelism());
 	}
+
+	@Override
+	public void startNewSession() throws Exception {
+		throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
index a54724f..60449db 100644
--- a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
+++ b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.tez.client;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.configuration.Configuration;
@@ -115,6 +116,26 @@ public class TezExecutor extends PlanExecutor {
 	}
 
 	@Override
+	public void start() throws Exception {
+		throw new IllegalStateException("Session management is not supported in the TezExecutor.");
+	}
+
+	@Override
+	public void stop() throws Exception {
+		throw new IllegalStateException("Session management is not supported in the TezExecutor.");
+	}
+
+	@Override
+	public void endSession(JobID jobID) throws Exception {
+		throw new IllegalStateException("Session management is not supported in the TezExecutor.");
+	}
+
+	@Override
+	public boolean isRunning() {
+		return false;
+	}
+
+	@Override
 	public JobExecutionResult executePlan(Plan plan) throws Exception {
 		return executePlanWithConf(tezConf, plan);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
index e8b7e86..566573e 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java
@@ -55,6 +55,10 @@ public class TestEnvironment extends ExecutionEnvironment {
 	}
 
 	@Override
+	public void startNewSession() throws Exception {
+	}
+
+	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		try {
 			OptimizedPlan op = compileProgram(jobName);

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
index 3991ac0..3e2657c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/clients/examples/LocalExecutorITCase.java
@@ -55,7 +55,8 @@ public class LocalExecutorITCase {
 			executor.setTaskManagerNumSlots(parallelism);
 			executor.setPrintStatusDuringExecution(false);
 			executor.start();
-			Plan wcPlan = wc.getPlan(Integer.valueOf(parallelism).toString(), inFile.toURI().toString(),outFile.toURI().toString());
+			Plan wcPlan = wc.getPlan(Integer.valueOf(parallelism).toString(),
+					inFile.toURI().toString(), outFile.toURI().toString());
 			wcPlan.setExecutionConfig(new ExecutionConfig());
 			executor.executePlan(wcPlan);
 			executor.stop();

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 68b099d..b1768f0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -102,7 +102,7 @@ public class RemoteEnvironmentITCase {
 		try {
 			env.execute();
 			Assert.fail("Program should not run successfully, cause of invalid akka settings.");
-		} catch (ProgramInvocationException ex) {
+		} catch (IOException ex) {
 			throw ex.getCause();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
index 082532e..24d9416 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.test.optimizer.jsonplan;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.client.program.Client.ProgramAbortException;
-import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.client.program.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PreviewPlanEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.examples.java.clustering.KMeans;
@@ -66,7 +66,7 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
 		try {
 			// <points path> <centers path> <result path> <num iterations
 			KMeans.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "123"});
-		} catch(ProgramAbortException pae) {
+		} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
 			// all good.
 		} catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
index 121bc88..a862242 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/JsonJobGraphGenerationTest.java
@@ -311,6 +311,10 @@ public class JsonJobGraphGenerationTest {
 		}
 
 		@Override
+		public void startNewSession() throws Exception {
+		}
+
+		@Override
 		public JobExecutionResult execute(String jobName) throws Exception {
 			Plan plan = createProgramPlan(jobName);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index b0de0e8..6aea26c 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.api.scala.runtime.jobmanager
 
-import akka.actor.Status.Success
 import akka.actor.{ActorSystem, PoisonPill}
 import akka.testkit.{ImplicitSender, TestKit}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index fb8e5a1..3337f1a 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -507,7 +507,7 @@ public abstract class YarnTestBase extends TestLogger {
 				"expected string did not show up", expectedStringSeen);
 
 		// check for 0 return code
-		Assert.assertTrue("Expecting return value == "+returnCode, runner.getReturnValue() == returnCode);
+		Assert.assertEquals("Expected return value", returnCode, runner.getReturnValue());
 		LOG.info("Test was successful");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/71bf2f57/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 57a5010..bd1698a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -25,6 +25,7 @@ import static akka.pattern.Patterns.ask;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.net.NetUtils;
@@ -231,6 +232,7 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster {
 	 */
 	@Override
 	public void stopAfterJob(JobID jobID) {
+		Preconditions.checkNotNull("The job id must not be null", jobID);
 		Future<Object> messageReceived = ask(applicationClient, new Messages.StopAMAfterJob(jobID), akkaTimeout);
 		try {
 			Await.result(messageReceived, akkaDuration);


Mime
View raw message