flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-6630] [FLINK-6631] Implement FLIP-6 Mesos cluster entrypoints + MesosTaskExecutorRunner
Date Sat, 19 Aug 2017 15:25:51 GMT
Repository: flink
Updated Branches:
  refs/heads/master 76f102288 -> bbac4a6c9


http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 4013e83..cea1688 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -80,7 +80,7 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint
{
 	}
 
 	@Override
-	protected void shutDown(boolean cleanupHaData) throws FlinkException {
+	protected void stopClusterComponents(boolean cleanupHaData) throws Exception {
 		Throwable exception = null;
 
 		if (dispatcher != null) {
@@ -99,12 +99,6 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint
{
 			}
 		}
 
-		try {
-			super.shutDown(cleanupHaData);
-		} catch (Throwable t) {
-			exception = ExceptionUtils.firstOrSuppressed(t, exception);
-		}
-
 		if (exception != null) {
 			throw new FlinkException("Could not properly shut down the session cluster entry point.",
exception);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/bbac4a6c/flink-tests/src/test/java/org/apache/flink/test/runtime/entrypoint/StreamingNoop.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/entrypoint/StreamingNoop.java
b/flink-tests/src/test/java/org/apache/flink/test/runtime/entrypoint/StreamingNoop.java
new file mode 100644
index 0000000..cd88ae1
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/entrypoint/StreamingNoop.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime.entrypoint;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.ObjectOutputStream;
+
+/**
+ * A program to generate a job graph for entrypoint testing purposes.
+ *
+ * <p>The dataflow is a simple streaming program that continuously monitors a (non-existent)
directory.
+ * Note that the job graph doesn't depend on any user code; it uses in-built Flink classes
only.
+ *
+ * <p>Program arguments:
+ *  --output [graph file] (default: 'job.graph')
+ */
+public class StreamingNoop {
+	public static void main(String[] args) throws Exception {
+		ParameterTool params = ParameterTool.fromArgs(args);
+
+		// define the dataflow
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(2);
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 1000));
+		env.readFileStream("input/", 60000, FileMonitoringFunction.WatchType.ONLY_NEW_FILES)
+			.addSink(new DiscardingSink<String>());
+
+		// generate a job graph
+		final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+		File jobGraphFile = new File(params.get("output", "job.graph"));
+		try (FileOutputStream output = new FileOutputStream(jobGraphFile);
+			ObjectOutputStream obOutput = new ObjectOutputStream(output)){
+			obOutput.writeObject(jobGraph);
+		}
+	}
+}


Mime
View raw message