Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8C6551149E for ; Sun, 21 Sep 2014 02:13:11 +0000 (UTC) Received: (qmail 85957 invoked by uid 500); 21 Sep 2014 02:13:11 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 85931 invoked by uid 500); 21 Sep 2014 02:13:11 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 85912 invoked by uid 99); 21 Sep 2014 02:13:11 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Sep 2014 02:13:11 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 21 Sep 2014 02:12:45 +0000 Received: (qmail 83170 invoked by uid 99); 21 Sep 2014 02:12:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 21 Sep 2014 02:12:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 66D77A1DDF3; Sun, 21 Sep 2014 02:12:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Sun, 21 Sep 2014 02:12:33 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [09/63] [abbrv] Refactor job graph construction to incremental attachment based X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java deleted file mode 100644 index 08a03bc..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatOutputVertex.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobgraph; - -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; -import org.apache.flink.runtime.operators.util.TaskConfig; - -/** - * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed - * for Nephele tasks which sink data in a not further specified way. As every job output vertex, - * a JobOutputVertex must not have any further output. - */ -public class OutputFormatOutputVertex extends AbstractJobOutputVertex { - /** - * Contains the output format associated to this output vertex. It can be
null
. - */ - private OutputFormat outputFormat; - - - /** - * Creates a new job file output vertex with the specified name. - * - * @param name - * the name of the new job file output vertex - * @param jobGraph - * the job graph this vertex belongs to - */ - public OutputFormatOutputVertex(String name, JobGraph jobGraph) { - this(name, null, jobGraph); - } - - public OutputFormatOutputVertex(String name, JobVertexID id, JobGraph jobGraph) { - super(name, id, jobGraph); - } - - /** - * Creates a new job file input vertex. - * - * @param jobGraph - * the job graph this vertex belongs to - */ - public OutputFormatOutputVertex(JobGraph jobGraph) { - this(null, jobGraph); - } - - public void setOutputFormat(OutputFormat format) { - this.outputFormat = format; - } - - public void initializeOutputFormatFromTaskConfig(ClassLoader cl) { - TaskConfig cfg = new TaskConfig(getConfiguration()); - UserCodeWrapper> wrapper = cfg.>getStubWrapper(cl); - - if (wrapper != null) { - this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, cl); - this.outputFormat.configure(cfg.getStubParameters()); - } - } - - /** - * Returns the output format. It can also be
null
. - * - * @return output format or
null
- */ - public OutputFormat getOutputFormat() { return outputFormat; } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java new file mode 100644 index 0000000..029d109 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/OutputFormatVertex.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph; + +import org.apache.flink.api.common.io.InitializeOnMaster; +import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.operators.util.UserCodeWrapper; +import org.apache.flink.runtime.operators.util.TaskConfig; + +/** + * A task vertex that run an initialization on the master, trying to deserialize an output format + * and initializing it on master, if necessary. + */ +public class OutputFormatVertex extends AbstractJobVertex { + + private static final long serialVersionUID = 1L; + + + /** Caches the output format associated to this output vertex. */ + private transient OutputFormat outputFormat; + + /** + * Creates a new task vertex with the specified name. + * + * @param name The name of the task vertex. + */ + public OutputFormatVertex(String name) { + super(name); + } + + + @Override + public void initializeOnMaster(ClassLoader loader) throws Exception { + if (this.outputFormat == null) { + TaskConfig cfg = new TaskConfig(getConfiguration()); + UserCodeWrapper> wrapper = cfg.>getStubWrapper(loader); + + if (wrapper == null) { + throw new Exception("No output format present in OutputFormatVertex's task configuration."); + } + + this.outputFormat = wrapper.getUserCodeObject(OutputFormat.class, loader); + this.outputFormat.configure(cfg.getStubParameters()); + } + + if (this.outputFormat instanceof InitializeOnMaster) { + ((InitializeOnMaster) this.outputFormat).initializeGlobal(getParallelism()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java deleted file mode 100644 index 3699f0e..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleInputVertex.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobgraph; - -import org.apache.flink.core.io.InputSplit; - - -public class SimpleInputVertex extends AbstractJobInputVertex { - - /** - * Creates a new job file output vertex with the specified name. - * - * @param name - * the name of the new job file output vertex - * @param jobGraph - * the job graph this vertex belongs to - */ - public SimpleInputVertex(String name, JobGraph jobGraph) { - this(name, null, jobGraph); - } - - public SimpleInputVertex(String name, JobVertexID id, JobGraph jobGraph) { - super(name, id, jobGraph); - } - - /** - * Creates a new job file input vertex. - * - * @param jobGraph - * the job graph this vertex belongs to - */ - public SimpleInputVertex(JobGraph jobGraph) { - this(null, jobGraph); - } - - @Override - public Class getInputSplitType() { - return null; - } - - @Override - public InputSplit[] getInputSplits(int minNumSplits) throws Exception { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java deleted file mode 100644 index 8709a07..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SimpleOutputVertex.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobgraph; - -/** - * A JobOutputVertex is a specific sub-type of a {@link AbstractJobOutputVertex} and is designed - * for Nephele tasks which sink data in a not further specified way. As every job output vertex, - * a JobOutputVertex must not have any further output. - */ -public class SimpleOutputVertex extends AbstractJobOutputVertex { - - /** - * Creates a new job file output vertex with the specified name. - * - * @param name - * the name of the new job file output vertex - * @param jobGraph - * the job graph this vertex belongs to - */ - public SimpleOutputVertex(String name, JobGraph jobGraph) { - this(name, null, jobGraph); - } - - public SimpleOutputVertex(String name, JobVertexID id, JobGraph jobGraph) { - super(name, id, jobGraph); - } - - /** - * Creates a new job file input vertex. - * - * @param jobGraph - * the job graph this vertex belongs to - */ - public SimpleOutputVertex(JobGraph jobGraph) { - this(null, jobGraph); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java index d3ad516..aab9c89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java @@ -29,7 +29,7 @@ public abstract class AbstractInvokable { /** * The environment assigned to this invokable. */ - private volatile Environment environment = null; + private volatile Environment environment; /** * Must be overwritten by the concrete task to instantiate the required record reader and record writer. @@ -60,7 +60,6 @@ public abstract class AbstractInvokable { * * @return the environment of this task or null if the environment has not yet been set */ - // TODO: This method should be final public Environment getEnvironment() { return this.environment; } @@ -72,7 +71,6 @@ public abstract class AbstractInvokable { * @return the current number of subtasks the respective task is split into */ public final int getCurrentNumberOfSubtasks() { - return this.environment.getCurrentNumberOfSubtasks(); } @@ -82,7 +80,6 @@ public abstract class AbstractInvokable { * @return the index of this subtask in the subtask group */ public final int getIndexInSubtaskGroup() { - return this.environment.getIndexInSubtaskGroup(); } @@ -92,7 +89,6 @@ public abstract class AbstractInvokable { * @return the task configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.AbstractJobVertex} */ public final Configuration getTaskConfiguration() { - return this.environment.getTaskConfiguration(); } @@ -102,40 +98,10 @@ public abstract class AbstractInvokable { * @return the job configuration object which was attached to the original {@link org.apache.flink.runtime.jobgraph.JobGraph} */ public final Configuration getJobConfiguration() { - return this.environment.getJobConfiguration(); } /** - * This method should be called by the user code if a custom - * user thread has been started. - * - * @param userThread - * the user thread which has been started - */ - public final void userThreadStarted(Thread userThread) { - - if (this.environment != null) { - this.environment.userThreadStarted(userThread); - } - - } - - /** - * This method should be called by the user code if a custom - * user thread has finished. - * - * @param userThread - * the user thread which has finished - */ - public final void userThreadFinished(Thread userThread) { - - if (this.environment != null) { - this.environment.userThreadFinished(userThread); - } - } - - /** * This method is called when a task is canceled either as a result of a user abort or an execution failure. It can * be overwritten to respond to shut down the user code properly. * @@ -143,7 +109,6 @@ public abstract class AbstractInvokable { * thrown if any exception occurs during the execution of the user code */ public void cancel() throws Exception { - // The default implementation does nothing. } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java index 7aa3374..94e6cab 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitIterator.java @@ -24,7 +24,7 @@ import java.util.Iterator; import org.apache.flink.core.io.InputSplit; /** - * The input split iterator allows an {@link AbstractInputTask} to iterator over all input splits it is supposed to + * The input split iterator allows a task to iterate over all input splits it is supposed to * consume. Internally, the input split iterator calls an {@link InputSplitProvider} on each next call in * order to facilitate lazy split assignment. * @@ -72,7 +72,6 @@ public class InputSplitIterator implements Iterator { @SuppressWarnings("unchecked") @Override public T next() { - T retVal = null; if (this.nextInputSplit == null) { @@ -88,8 +87,6 @@ public class InputSplitIterator implements Iterator { @Override public void remove() { - throw new RuntimeException("The InputSplitIterator does not implement the remove method"); } - } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java index 22722e7..20a4ab1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java @@ -16,23 +16,21 @@ * limitations under the License. */ - package org.apache.flink.runtime.jobgraph.tasks; import org.apache.flink.core.io.InputSplit; /** - * An input split provider can be successively queried to provide a series of {@link InputSplit} objects an - * {@link AbstractInputTask} is supposed to consume in the course of its execution. - * + * An input split provider can be successively queried to provide a series of {@link InputSplit} objects a + * task is supposed to consume in the course of its execution. */ public interface InputSplitProvider { /** - * Requests the next input split to be consumed by the calling {@link AbstractInputTask}. + * Requests the next input split to be consumed by the calling task. * - * @return the next input split to be consumed by the calling {@link AbstractInputTask} or null if the - * {@link AbstractInputTask} shall not consume any further input splits. + * @return the next input split to be consumed by the calling task or null if the + * task shall not consume any further input splits. */ InputSplit getNextInputSplit(); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java deleted file mode 100644 index b8d9557..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DeploymentManager.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.jobmanager; - -import java.util.List; - -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.jobgraph.JobID; - -/** - * A deployment manager is responsible for deploying a list of {@link ExecutionVertex} objects the given - * {@link org.apache.flink.runtime.instance.Instance}. It is called by a {@link org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler} implementation whenever at least one - * {@link ExecutionVertex} has become ready to be executed. - * - */ -public interface DeploymentManager { - - /** - * Deploys the list of vertices on the given {@link org.apache.flink.runtime.instance.Instance}. - * - * @param jobID - * the ID of the job the vertices to be deployed belong to - * @param instance - * the instance on which the vertices shall be deployed - * @param verticesToBeDeployed - * the list of vertices to be deployed - */ - void deploy(JobID jobID, Instance instance, List verticesToBeDeployed); -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java index 6800a68..d36659f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/EventCollector.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.jobmanager; import java.util.ArrayList; @@ -32,27 +31,20 @@ import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent; import org.apache.flink.runtime.event.job.JobEvent; import org.apache.flink.runtime.event.job.ManagementEvent; import org.apache.flink.runtime.event.job.RecentJobEvent; -import org.apache.flink.runtime.event.job.VertexAssignmentEvent; import org.apache.flink.runtime.event.job.VertexEvent; import org.apache.flink.runtime.execution.ExecutionListener; -import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.ExecutionState2; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertexID; -import org.apache.flink.runtime.executiongraph.InternalJobStatus; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.ManagementGraphFactory; -import org.apache.flink.runtime.executiongraph.VertexAssignmentListener; -import org.apache.flink.runtime.instance.AllocatedResource; -import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.archive.ArchiveListener; import org.apache.flink.runtime.managementgraph.ManagementGraph; import org.apache.flink.runtime.managementgraph.ManagementVertex; -import org.apache.flink.runtime.managementgraph.ManagementVertexID; import org.apache.flink.runtime.profiling.ProfilingListener; import org.apache.flink.runtime.profiling.types.ProfilingEvent; @@ -62,7 +54,6 @@ import org.apache.flink.runtime.profiling.types.ProfilingEvent; * the event collector removes all intervals which are older than the interval. *

* This class is thread-safe. - * */ public final class EventCollector extends TimerTask implements ProfilingListener { @@ -72,85 +63,43 @@ public final class EventCollector extends TimerTask implements ProfilingListener * the data provided by the executionStateChanged callback method. * However, these IDs are needed to create the construct the {@link VertexEvent} and the * {@link ExecutionStateChangeEvent}. - * */ private static final class ExecutionListenerWrapper implements ExecutionListener { - /** - * The event collector to forward the created event to. - */ + /** The event collector to forward the created event to. */ private final EventCollector eventCollector; - /** - * The vertex this listener belongs to. - */ - private final ExecutionVertex vertex; + private final ExecutionGraph graph; + - /** - * Constructs a new execution listener object. - * - * @param eventCollector - * the event collector to forward the created event to - * @param vertex - * the vertex this listener belongs to. - */ - public ExecutionListenerWrapper(final EventCollector eventCollector, final ExecutionVertex vertex) { + public ExecutionListenerWrapper(EventCollector eventCollector, ExecutionGraph graph) { this.eventCollector = eventCollector; - this.vertex = vertex; + this.graph = graph; } - /** - * {@inheritDoc} - */ @Override - public void executionStateChanged(final JobID jobID, final ExecutionVertexID vertexID, - final ExecutionState newExecutionState, final String optionalMessage) { - + public void executionStateChanged(JobID jobID, JobVertexID vertexId, int subtask, ExecutionAttemptID executionId, + ExecutionState2 newExecutionState, String optionalMessage) + { final long timestamp = System.currentTimeMillis(); - final JobVertexID jobVertexID = this.vertex.getGroupVertex().getJobVertexID(); - final String taskName = this.vertex.getGroupVertex().getName(); - final int totalNumberOfSubtasks = this.vertex.getGroupVertex().getCurrentNumberOfGroupMembers(); - final int indexInSubtaskGroup = this.vertex.getIndexInVertexGroup(); + final ExecutionJobVertex vertex = graph.getJobVertex(vertexId); + + final String taskName = vertex == null ? "(null)" : vertex.getJobVertex().getName(); + final int totalNumberOfSubtasks = vertex == null ? -1 : vertex.getParallelism(); // Create a new vertex event - final VertexEvent vertexEvent = new VertexEvent(timestamp, jobVertexID, taskName, totalNumberOfSubtasks, - indexInSubtaskGroup, newExecutionState, optionalMessage); + final VertexEvent vertexEvent = new VertexEvent(timestamp, vertexId, taskName, totalNumberOfSubtasks, + subtask, newExecutionState, optionalMessage); this.eventCollector.addEvent(jobID, vertexEvent); final ExecutionStateChangeEvent executionStateChangeEvent = new ExecutionStateChangeEvent(timestamp, - vertexID.toManagementVertexID(), newExecutionState); + vertexId.toManagementVertexId(subtask), newExecutionState); this.eventCollector.updateManagementGraph(jobID, executionStateChangeEvent, optionalMessage); this.eventCollector.addEvent(jobID, executionStateChangeEvent); } - - /** - * {@inheritDoc} - */ - @Override - public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) { - // Nothing to do here - } - - /** - * {@inheritDoc} - */ - @Override - public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) { - // Nothing to do here - } - - /** - * {@inheritDoc} - */ - @Override - public int getPriority() { - - return 20; - } - } /** @@ -162,24 +111,16 @@ public final class EventCollector extends TimerTask implements ProfilingListener */ private static final class JobStatusListenerWrapper implements JobStatusListener { - /** - * The event collector to forward the created event to. - */ + /** The event collector to forward the created event to. */ private final EventCollector eventCollector; - /** - * The name of the job this wrapper has been created for. - */ + /** The name of the job this wrapper has been created for. */ private final String jobName; - /** - * true if profiling events are collected for the job, false otherwise. - */ + /** true if profiling events are collected for the job, false otherwise. */ private final boolean isProfilingAvailable; - /** - * The time stamp of the job submission - */ + /** The time stamp of the job submission */ private final long submissionTimestamp; /** @@ -194,101 +135,32 @@ public final class EventCollector extends TimerTask implements ProfilingListener * @param submissionTimestamp * the submission time stamp of the job */ - public JobStatusListenerWrapper(final EventCollector eventCollector, final String jobName, - final boolean isProfilingAvailable, final long submissionTimestamp) { - + public JobStatusListenerWrapper(EventCollector eventCollector, String jobName, + boolean isProfilingAvailable, long submissionTimestamp) + { this.eventCollector = eventCollector; this.jobName = jobName; this.isProfilingAvailable = isProfilingAvailable; this.submissionTimestamp = submissionTimestamp; } - /** - * {@inheritDoc} - */ @Override - public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus, - final String optionalMessage) { + public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) { final JobID jobID = executionGraph.getJobID(); - if (newJobStatus == InternalJobStatus.SCHEDULED) { + if (newJobStatus == JobStatus.RUNNING) { final ManagementGraph managementGraph = ManagementGraphFactory.fromExecutionGraph(executionGraph); this.eventCollector.addManagementGraph(jobID, managementGraph); } // Update recent job event - final JobStatus jobStatus = InternalJobStatus.toJobStatus(newJobStatus); - if (jobStatus != null) { - this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable, - this.submissionTimestamp, jobStatus); - - this.eventCollector.addEvent(jobID, - new JobEvent(System.currentTimeMillis(), jobStatus, optionalMessage)); - } - } - } - - /** - * The vertex assignment listener wrapper is an auxiliary class. It is required - * because the job ID cannot be accessed from the data provided by the vertexAssignmentChanged callback - * method. However, this job ID is needed to prepare the {@link VertexAssignmentEvent} for transmission. - * - */ - private static final class VertexAssignmentListenerWrapper implements VertexAssignmentListener { - - /** - * The event collector to forward the created event to. - */ - private final EventCollector eventCollector; - - /** - * The ID the job this wrapper has been created for. - */ - private final JobID jobID; - - /** - * Constructs a new vertex assignment listener wrapper. - * - * @param eventCollector - * the event collector to forward the events to - * @param jobID - * the ID of the job - */ - public VertexAssignmentListenerWrapper(final EventCollector eventCollector, final JobID jobID) { - this.eventCollector = eventCollector; - this.jobID = jobID; - } - - /** - * {@inheritDoc} - */ - @Override - public void vertexAssignmentChanged(final ExecutionVertexID id, final AllocatedResource newAllocatedResource) { - - // Create a new vertex assignment event - final ManagementVertexID managementVertexID = id.toManagementVertexID(); - final long timestamp = System.currentTimeMillis(); - - final Instance instance = newAllocatedResource.getInstance(); - VertexAssignmentEvent event; - if (instance == null) { - event = new VertexAssignmentEvent(timestamp, managementVertexID, "null"); - } else { - - String instanceName = null; - if (instance.getInstanceConnectionInfo() != null) { - instanceName = instance.getInstanceConnectionInfo().toString(); - } else { - instanceName = instance.toString(); - } + this.eventCollector.updateRecentJobEvent(jobID, this.jobName, this.isProfilingAvailable, + this.submissionTimestamp, newJobStatus); - event = new VertexAssignmentEvent(timestamp, managementVertexID, instanceName); - } - - this.eventCollector.updateManagementGraph(jobID, event); - this.eventCollector.addEvent(this.jobID, event); + this.eventCollector.addEvent(jobID, + new JobEvent(System.currentTimeMillis(), newJobStatus, optionalMessage)); } } @@ -344,8 +216,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener * true if {@link ManagementEvent} objects shall be added to the list as well, * false otherwise */ - public void getEventsForJob(final JobID jobID, final List eventList, - final boolean includeManagementEvents) { + public void getEventsForJob(JobID jobID, List eventList, boolean includeManagementEvents) { synchronized (this.collectedEvents) { @@ -431,15 +302,15 @@ public final class EventCollector extends TimerTask implements ProfilingListener * @param jobStatus * the status of the job */ - private void updateRecentJobEvent(final JobID jobID, final String jobName, final boolean isProfilingEnabled, - final long submissionTimestamp, final JobStatus jobStatus) { - + private void updateRecentJobEvent(JobID jobID, String jobName, boolean isProfilingEnabled, + long submissionTimestamp, JobStatus jobStatus) + { final long currentTime = System.currentTimeMillis(); + final RecentJobEvent recentJobEvent = new RecentJobEvent(jobID, jobName, jobStatus, isProfilingEnabled, submissionTimestamp, currentTime); synchronized (this.recentJobs) { - this.recentJobs.put(jobID, recentJobEvent); } } @@ -448,7 +319,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener * Registers a job in form of its execution graph representation * with the job progress collector. The collector will subscribe * to state changes of the individual subtasks. A separate - * deregistration is not necessary since the job progress collector + * de-registration is not necessary since the job progress collector * periodically discards outdated progress information. * * @param executionGraph @@ -458,26 +329,12 @@ public final class EventCollector extends TimerTask implements ProfilingListener * @param submissionTimestamp * the submission time stamp of the job */ - public void registerJob(final ExecutionGraph executionGraph, final boolean profilingAvailable, - final long submissionTimestamp) { - - final Iterator it = new ExecutionGraphIterator(executionGraph, true); - - while (it.hasNext()) { + public void registerJob(ExecutionGraph executionGraph, boolean profilingAvailable, long submissionTimestamp) { - final ExecutionVertex vertex = it.next(); + executionGraph.registerExecutionListener(new ExecutionListenerWrapper(this, executionGraph)); - // Register the listener object which will pass state changes on to the collector - vertex.registerExecutionListener(new ExecutionListenerWrapper(this, vertex)); - - // Register the listener object which will pass assignment changes on to the collector - vertex.registerVertexAssignmentListener(new VertexAssignmentListenerWrapper(this, executionGraph.getJobID())); - } - - // Register one job status listener wrapper for the entire job executionGraph.registerJobStatusListener(new JobStatusListenerWrapper(this, executionGraph.getJobName(), profilingAvailable, submissionTimestamp)); - } /** @@ -547,7 +404,6 @@ public final class EventCollector extends TimerTask implements ProfilingListener @Override public void processProfilingEvents(final ProfilingEvent profilingEvent) { - // Simply add profiling events to the job's event queue addEvent(profilingEvent.getJobID(), profilingEvent); } @@ -561,7 +417,6 @@ public final class EventCollector extends TimerTask implements ProfilingListener * the management graph to be added */ void addManagementGraph(final JobID jobID, final ManagementGraph managementGraph) { - synchronized (this.recentManagementGraphs) { this.recentManagementGraphs.put(jobID, managementGraph); } @@ -576,38 +431,12 @@ public final class EventCollector extends TimerTask implements ProfilingListener * @return the management graph for the job with the given ID or null if no such graph exists */ public ManagementGraph getManagementGraph(final JobID jobID) { - synchronized (this.recentManagementGraphs) { return this.recentManagementGraphs.get(jobID); } } /** - * Applies changes in the vertex assignment to the stored management graph. - * - * @param jobID - * the ID of the job whose management graph shall be updated - * @param vertexAssignmentEvent - * the event describing the changes in the vertex assignment - */ - private void updateManagementGraph(final JobID jobID, final VertexAssignmentEvent vertexAssignmentEvent) { - - synchronized (this.recentManagementGraphs) { - - final ManagementGraph managementGraph = this.recentManagementGraphs.get(jobID); - if (managementGraph == null) { - return; - } - final ManagementVertex vertex = managementGraph.getVertexByID(vertexAssignmentEvent.getVertexID()); - if (vertex == null) { - return; - } - - vertex.setInstanceName(vertexAssignmentEvent.getInstanceName()); - } - } - - /** * Applies changes in the state of an execution vertex to the stored management graph. * * @param jobID @@ -615,7 +444,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener * @param executionStateChangeEvent * the event describing the changes in the execution state of the vertex */ - private void updateManagementGraph(final JobID jobID, final ExecutionStateChangeEvent executionStateChangeEvent, String optionalMessage) { + private void updateManagementGraph(JobID jobID, ExecutionStateChangeEvent executionStateChangeEvent, String optionalMessage) { synchronized (this.recentManagementGraphs) { @@ -629,7 +458,7 @@ public final class EventCollector extends TimerTask implements ProfilingListener } vertex.setExecutionState(executionStateChangeEvent.getNewExecutionState()); - if (executionStateChangeEvent.getNewExecutionState() == ExecutionState.FAILED) { + if (optionalMessage != null) { vertex.setOptMessage(optionalMessage); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b32e77a2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java index 3b76b78..fc76d73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.jobmanager; import java.io.File; @@ -25,11 +24,10 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.HashSet; -import java.util.Iterator; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -48,61 +46,49 @@ import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.core.io.StringRecord; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; import org.apache.flink.runtime.ExecutionMode; +import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorEvent; import org.apache.flink.runtime.client.AbstractJobResult; +import org.apache.flink.runtime.client.AbstractJobResult.ReturnCode; import org.apache.flink.runtime.client.JobCancelResult; import org.apache.flink.runtime.client.JobProgressResult; import org.apache.flink.runtime.client.JobSubmissionResult; -import org.apache.flink.runtime.client.AbstractJobResult.ReturnCode; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.event.job.AbstractEvent; import org.apache.flink.runtime.event.job.RecentJobEvent; -import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; -import org.apache.flink.runtime.executiongraph.ExecutionEdge; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertexID; -import org.apache.flink.runtime.executiongraph.GraphConversionException; -import org.apache.flink.runtime.executiongraph.InternalJobStatus; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.JobStatusListener; -import org.apache.flink.runtime.instance.DefaultInstanceManager; import org.apache.flink.runtime.instance.HardwareDescription; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceConnectionInfo; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.instance.InstanceManager; import org.apache.flink.runtime.instance.LocalInstanceManager; import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse; -import org.apache.flink.runtime.io.network.RemoteReceiver; import org.apache.flink.runtime.io.network.channels.ChannelID; import org.apache.flink.runtime.ipc.RPC; import org.apache.flink.runtime.ipc.Server; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobID; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager; import org.apache.flink.runtime.jobmanager.archive.ArchiveListener; import org.apache.flink.runtime.jobmanager.archive.MemoryArchivist; import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler; -import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitWrapper; import org.apache.flink.runtime.jobmanager.web.WebInfoServer; import org.apache.flink.runtime.managementgraph.ManagementGraph; -import org.apache.flink.runtime.managementgraph.ManagementVertexID; -import org.apache.flink.runtime.profiling.JobManagerProfiler; -import org.apache.flink.runtime.profiling.ProfilingUtils; import org.apache.flink.runtime.protocols.AccumulatorProtocol; import org.apache.flink.runtime.protocols.ChannelLookupProtocol; import org.apache.flink.runtime.protocols.ExtendedManagementProtocol; import org.apache.flink.runtime.protocols.InputSplitProviderProtocol; import org.apache.flink.runtime.protocols.JobManagerProtocol; -import org.apache.flink.runtime.taskmanager.AbstractTaskResult; -import org.apache.flink.runtime.taskmanager.TaskCancelResult; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.runtime.taskmanager.TaskKillResult; -import org.apache.flink.runtime.taskmanager.TaskSubmissionResult; import org.apache.flink.runtime.types.IntegerRecord; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.ExecutorThreadFactory; @@ -113,15 +99,13 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; +import com.google.common.base.Preconditions; + /** - * In Nephele the job manager is the central component for communication with clients, creating - * schedules for incoming jobs and supervise their execution. A job manager may only exist once in - * the system and its address must be known the clients. - * Task managers can discover the job manager by means of an UDP broadcast and afterwards advertise - * themselves as new workers for tasks. - * + * The JobManager is the master that coordinates the distributed execution. + * It receives jobs from clients, tracks the distributed execution. */ -public class JobManager implements DeploymentManager, ExtendedManagementProtocol, InputSplitProviderProtocol, +public class JobManager implements ExtendedManagementProtocol, InputSplitProviderProtocol, JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol { @@ -130,32 +114,46 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol private final static int FAILURE_RETURN_CODE = 1; + /** Executor service for asynchronous commands (to relieve the RPC threads of work) */ private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE); - private final Server jobManagerServer; - private final EventCollector eventCollector; - - private final ArchiveListener archive; + /** The RPC end point through which the JobManager gets its calls */ + private final Server jobManagerServer; + /** Keeps track of the currently available task managers */ private final InstanceManager instanceManager; + /** Assigns tasks to slots and keeps track on available and allocated task slots*/ private final DefaultScheduler scheduler; + /** The currently running jobs */ + private final ConcurrentHashMap currentJobs; + + + // begin: these will be consolidated / removed + private final EventCollector eventCollector; + + private final ArchiveListener archive; + private final AccumulatorManager accumulatorManager; - private final int recommendedClientPollingInterval; - + // end: these will be consolidated / removed + private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false); - + private volatile boolean isShutDown; private WebInfoServer server; + // -------------------------------------------------------------------------------------------- + // Initialization & Shutdown + // -------------------------------------------------------------------------------------------- + public JobManager(ExecutionMode executionMode) throws Exception { final String ipcAddressString = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); @@ -190,6 +188,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol this.archive = null; } + this.currentJobs = new ConcurrentHashMap(); + // Create the accumulator manager, with same archiving limit as web // interface. We need to store the accumulators for at least one job. // Otherwise they might be deleted before the client requested the @@ -218,21 +218,15 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol this.instanceManager = new LocalInstanceManager(numTaskManagers); } else if (executionMode == ExecutionMode.CLUSTER) { - this.instanceManager = new DefaultInstanceManager(); + this.instanceManager = new InstanceManager(); } else { throw new IllegalArgumentException("ExecutionMode"); } - // Try to load the scheduler for the given execution mode - final String schedulerClassName = JobManagerUtils.getSchedulerClassName(executionMode); - LOG.info("Trying to load " + schedulerClassName + " as scheduler"); - - // Try to get the instance manager class name - this.scheduler = JobManagerUtils.loadScheduler(schedulerClassName, this, this.instanceManager); - if (this.scheduler == null) { - throw new Exception("Unable to load scheduler " + schedulerClassName); - } + // create the scheduler and make it listen at the availability of new instances + this.scheduler = new DefaultScheduler(); + this.instanceManager.addInstanceListener(this.scheduler); } public void shutdown() { @@ -275,393 +269,223 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol LOG.debug("Shutdown of job manager completed"); } - /** - * Entry point for the program - * - * @param args - * arguments from the command line - */ - - public static void main(String[] args) { - // determine if a valid log4j config exists and initialize a default logger if not - if (System.getProperty("log4j.configuration") == null) { - Logger root = Logger.getRootLogger(); - root.removeAllAppenders(); - PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n"); - ConsoleAppender appender = new ConsoleAppender(layout, "System.err"); - root.addAppender(appender); - root.setLevel(Level.INFO); - } - - JobManager jobManager; - try { - jobManager = initialize(args); - // Start info server for jobmanager - jobManager.startInfoServer(); - } - catch (Exception e) { - LOG.fatal(e.getMessage(), e); - System.exit(FAILURE_RETURN_CODE); - } - - // Clean up is triggered through a shutdown hook - // freeze this thread to keep the JVM alive (the job manager threads are daemon threads) - Object w = new Object(); - synchronized (w) { - try { - w.wait(); - } catch (InterruptedException e) {} - } - } - - @SuppressWarnings("static-access") - public static JobManager initialize(String[] args) throws Exception { - final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg() - .withDescription("Specify configuration directory.").create("configDir"); - - final Option executionModeOpt = OptionBuilder.withArgName("execution mode").hasArg() - .withDescription("Specify execution mode.").create("executionMode"); - - final Options options = new Options(); - options.addOption(configDirOpt); - options.addOption(executionModeOpt); - - CommandLineParser parser = new GnuParser(); - CommandLine line = null; - try { - line = parser.parse(options, args); - } catch (ParseException e) { - LOG.error("CLI Parsing failed. Reason: " + e.getMessage()); - System.exit(FAILURE_RETURN_CODE); - } - - final String configDir = line.getOptionValue(configDirOpt.getOpt(), null); - final String executionModeName = line.getOptionValue(executionModeOpt.getOpt(), "local"); - - ExecutionMode executionMode = null; - if ("local".equals(executionModeName)) { - executionMode = ExecutionMode.LOCAL; - } else if ("cluster".equals(executionModeName)) { - executionMode = ExecutionMode.CLUSTER; - } else { - System.err.println("Unrecognized execution mode: " + executionModeName); - System.exit(FAILURE_RETURN_CODE); - } - - // print some startup environment info, like user, code revision, etc - EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager"); - - // First, try to load global configuration - GlobalConfiguration.loadConfiguration(configDir); - - // Create a new job manager object - JobManager jobManager = new JobManager(executionMode); - - // Set base dir for info server - Configuration infoserverConfig = GlobalConfiguration.getConfiguration(); - if (configDir != null && new File(configDir).isDirectory()) { - infoserverConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir+"/.."); - } - GlobalConfiguration.includeConfiguration(infoserverConfig); - return jobManager; - } - + // -------------------------------------------------------------------------------------------- + // Job Execution + // -------------------------------------------------------------------------------------------- @Override public JobSubmissionResult submitJob(JobGraph job) throws IOException { + + boolean success = false; + try { // First check if job is null if (job == null) { return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Submitted job is null!"); } - if (LOG.isDebugEnabled()) { - LOG.debug("Submitted job " + job.getName() + " is not null"); - } - - // Check if any vertex of the graph has null edges - AbstractJobVertex jv = job.findVertexWithNullEdges(); - if (jv != null) { - JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, "Vertex " - + jv.getName() + " has at least one null edge"); - return result; + if (LOG.isInfoEnabled()) { + LOG.info(String.format("Received job %s (%s)", job.getJobID(), job.getName())); } - - if (LOG.isDebugEnabled()) { - LOG.debug("Submitted job " + job.getName() + " has no null edges"); + + // get the existing execution graph (if we attach), or construct a new empty one to attach + ExecutionGraph executionGraph = this.currentJobs.get(job.getJobID()); + if (executionGraph == null) { + if (LOG.isInfoEnabled()) { + LOG.info("Creating new execution graph for job " + job.getJobID() + " (" + job.getName() + ')'); + } + + executionGraph = new ExecutionGraph(job.getJobID(), job.getName(), job.getJobConfiguration(), this.executorService); + ExecutionGraph previous = this.currentJobs.putIfAbsent(job.getJobID(), executionGraph); + if (previous != null) { + throw new JobException("Concurrent submission of a job with the same jobId: " + job.getJobID()); + } } - - // Next, check if the graph is weakly connected - if (!job.isWeaklyConnected()) { - JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, - "Job graph is not weakly connected"); - return result; + else { + if (LOG.isInfoEnabled()) { + LOG.info(String.format("Found existing execution graph for id %s, attaching this job.", job.getJobID())); + } } - - if (LOG.isDebugEnabled()) { - LOG.debug("The graph of job " + job.getName() + " is weakly connected"); + + // grab the class loader for user-defined code + final ClassLoader userCodeLoader = LibraryCacheManager.getClassLoader(job.getJobID()); + if (userCodeLoader == null) { + throw new JobException("The user code class loader could not be initialized."); } - - // Check if job graph has cycles - if (!job.isAcyclic()) { - JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, - "Job graph is not a DAG"); - return result; + + String[] jarFilesForJob = LibraryCacheManager.getRequiredJarFiles(job.getJobID()); + for (String fileId : jarFilesForJob) { + executionGraph.addUserCodeJarFile(fileId); } - + + // first, perform the master initialization of the nodes if (LOG.isDebugEnabled()) { - LOG.debug("The graph of job " + job.getName() + " is acyclic"); - } - - // Check constrains on degree - jv = job.areVertexDegreesCorrect(); - if (jv != null) { - JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, - "Degree of vertex " + jv.getName() + " is incorrect"); - return result; + LOG.debug(String.format("Running master initialization of job %s (%s)", job.getJobID(), job.getName())); } - - if (LOG.isDebugEnabled()) { - LOG.debug("All vertices of job " + job.getName() + " have the correct degree"); + try { + for (AbstractJobVertex vertex : job.getVertices()) { + // check that the vertex has an executable class + String executableClass = vertex.getInvokableClassName(); + if (executableClass == null || executableClass.length() == 0) { + throw new JobException(String.format("The vertex %s (%s) has no invokable class.", vertex.getID(), vertex.getName())); + } + + // master side initialization + vertex.initializeOnMaster(userCodeLoader); + } } - - if (!job.isInstanceDependencyChainAcyclic()) { - JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, - "The dependency chain for instance sharing contains a cycle"); - - return result; + catch (FileNotFoundException e) { + LOG.error("File-not-Found: " + e.getMessage()); + return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, e.getMessage()); } - + + // first topologically sort the job vertices to form the basis of creating the execution graph + List topoSorted = job.getVerticesSortedTopologicallyFromSources(); + + // first convert this job graph to an execution graph if (LOG.isDebugEnabled()) { - LOG.debug("The dependency chain for instance sharing is acyclic"); + LOG.debug(String.format("Adding %d vertices from job graph %s (%s)", topoSorted.size(), job.getJobID(), job.getName())); } - - // Try to create initial execution graph from job graph - LOG.info("Creating initial execution graph from job graph " + job.getName()); - ExecutionGraph eg; - - try { - eg = new ExecutionGraph(job, 1); - } catch (GraphConversionException e) { - if (e.getCause() == null) { - return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e)); - } else { - Throwable t = e.getCause(); - if (t instanceof FileNotFoundException) { - return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, t.getMessage()); - } else { - return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t)); - } - } + + executionGraph.attachJobGraph(topoSorted); + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Successfully created execution graph from job graph %s (%s)", job.getJobID(), job.getName())); } // Register job with the progress collector if (this.eventCollector != null) { - this.eventCollector.registerJob(eg, false, System.currentTimeMillis()); + this.eventCollector.registerJob(executionGraph, false, System.currentTimeMillis()); } // Register for updates on the job status - eg.registerJobStatusListener(this); + executionGraph.registerJobStatusListener(this); // Schedule job if (LOG.isInfoEnabled()) { LOG.info("Scheduling job " + job.getName()); } - try { - this.scheduler.scheduleJob(eg); - } catch (SchedulingException e) { - unregisterJob(eg); - JobSubmissionResult result = new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(e)); - return result; - } + executionGraph.scheduleForExecution(this.scheduler); // Return on success + success = true; return new JobSubmissionResult(AbstractJobResult.ReturnCode.SUCCESS, null); } catch (Throwable t) { LOG.error("Job submission failed.", t); return new JobSubmissionResult(AbstractJobResult.ReturnCode.ERROR, StringUtils.stringifyException(t)); } - } - - - public InstanceManager getInstanceManager() { - return this.instanceManager; - } - - /** - * This method is a convenience method to unregister a job from all of - * Nephele's monitoring, profiling and optimization components at once. - * Currently, it is only being used to unregister from profiling (if activated). - * - * @param executionGraph - * the execution graph to remove from the job manager - */ - private void unregisterJob(final ExecutionGraph executionGraph) { - - // Remove job from profiler (if activated) - if (this.profiler != null - && executionGraph.getJobConfiguration().getBoolean(ProfilingUtils.PROFILE_JOB_KEY, true)) { - this.profiler.unregisterProfilingJob(executionGraph); - - if (this.eventCollector != null) { - this.profiler.unregisterFromProfilingData(executionGraph.getJobID(), this.eventCollector); - } - } - - // Remove job from input split manager - if (this.inputSplitManager != null) { - this.inputSplitManager.unregisterJob(executionGraph); - } - - // Unregister job with library cache manager - try { - LibraryCacheManager.unregister(executionGraph.getJobID()); - } catch (IOException ioe) { - if (LOG.isWarnEnabled()) { - LOG.warn(ioe); + finally { + if (!success) { + this.currentJobs.remove(job.getJobID()); + + try { + LibraryCacheManager.unregister(job.getJobID()); + } + catch (IllegalStateException e) { + // may happen if the job failed before being registered at the + // library cache manager + } + catch (Throwable t) { + LOG.error("Error while de-registering job at library cache manager.", t); + } } } } - @Override - public void sendHeartbeat(final InstanceConnectionInfo instanceConnectionInfo) { - - // Delegate call to instance manager - if (this.instanceManager != null) { - - final Runnable heartBeatRunnable = new Runnable() { + public JobCancelResult cancelJob(JobID jobID) throws IOException { - @Override - public void run() { - instanceManager.reportHeartBeat(instanceConnectionInfo); - } - }; + LOG.info("Trying to cancel job with ID " + jobID); - this.executorService.execute(heartBeatRunnable); + final ExecutionGraph eg = this.currentJobs.get(jobID); + if (eg == null) { + LOG.info("No job found with ID " + jobID); + return new JobCancelResult(ReturnCode.ERROR, "Cannot find job with ID " + jobID); } - } - @Override - public RegisterTaskManagerResult registerTaskManager(final InstanceConnectionInfo instanceConnectionInfo, - final HardwareDescription hardwareDescription, final IntegerRecord numberOfSlots){ - if(this.instanceManager != null) { - final Runnable registerTaskManagerRunnable = new Runnable() { - @Override - public void run(){ - instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, - numberOfSlots.getValue()); - } - }; + final Runnable cancelJobRunnable = new Runnable() { + @Override + public void run() { + eg.cancel(); + } + }; - this.executorService.execute(registerTaskManagerRunnable); - return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.SUCCESS); - } + eg.execute(cancelJobRunnable); - return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.FAILURE); + return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null); } - - + @Override - public void updateTaskExecutionState(final TaskExecutionState executionState) throws IOException { - - // Ignore calls with executionResult == null - if (executionState == null) { - LOG.error("Received call to updateTaskExecutionState with executionState == null"); - return; - } + public void updateTaskExecutionState(TaskExecutionState executionState) throws IOException { + Preconditions.checkNotNull(executionState); - if (executionState.getExecutionState() == ExecutionState.FAILED) { - LOG.error(executionState.getDescription()); - } - final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(executionState.getJobID()); + final ExecutionGraph eg = this.currentJobs.get(executionState.getJobID()); if (eg == null) { LOG.error("Cannot find execution graph for ID " + executionState.getJobID() + " to change state to " + executionState.getExecutionState()); return; } - final ExecutionVertex vertex = eg.getVertexByID(executionState.getID()); - if (vertex == null) { - LOG.error("Cannot find vertex with ID " + executionState.getID() + " of job " + eg.getJobID() - + " to change state to " + executionState.getExecutionState()); - return; - } - - // Asynchronously update execute state of vertex - vertex.updateExecutionStateAsynchronously(executionState.getExecutionState(), executionState.getDescription()); + eg.updateState(executionState); } - - + @Override - public JobCancelResult cancelJob(final JobID jobID) throws IOException { + public InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertexId) throws IOException { - LOG.info("Trying to cancel job with ID " + jobID); - - final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID); - if (eg == null) { - return new JobCancelResult(ReturnCode.ERROR, "Cannot find job with ID " + jobID); + final ExecutionGraph graph = this.currentJobs.get(jobID); + if (graph == null) { + LOG.error("Cannot find execution graph to job ID " + jobID); + return null; } - final Runnable cancelJobRunnable = new Runnable() { - - @Override - public void run() { - eg.updateJobStatus(InternalJobStatus.CANCELING, "Job canceled by user"); - final TaskCancelResult cancelResult = cancelJob(eg); - if (cancelResult != null) { - LOG.error(cancelResult.getDescription()); - } - } - }; - - eg.executeCommand(cancelJobRunnable); - - LOG.info("Cancel of job " + jobID + " successfully triggered"); + final ExecutionJobVertex vertex = graph.getJobVertex(vertexId); + if (vertex == null) { + LOG.error("Cannot find execution vertex for vertex ID " + vertexId); + return null; + } - return new JobCancelResult(AbstractJobResult.ReturnCode.SUCCESS, null); + InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); + if (splitAssigner == null) { + LOG.error("No InputSplitAssigner for vertex ID " + vertexId); + return null; + } + + + return splitAssigner.getNextInputSplit(null); } + + @Override + public void jobStatusHasChanged(ExecutionGraph executionGraph, JobStatus newJobStatus, String optionalMessage) { - /** - * Cancels all the tasks in the current and upper stages of the - * given execution graph. - * - * @param eg - * the execution graph representing the job to cancel. - * @return null if no error occurred during the cancel attempt, - * otherwise the returned object will describe the error - */ - private TaskCancelResult cancelJob(final ExecutionGraph eg) { - - TaskCancelResult errorResult = null; - - /** - * Cancel all nodes in the current and upper execution stages. - */ - final Iterator it = new ExecutionGraphIterator(eg, eg.getIndexOfCurrentExecutionStage(), - false, true); - while (it.hasNext()) { + final JobID jid = executionGraph.getJobID(); + + if (LOG.isInfoEnabled()) { + String message = optionalMessage == null ? "." : ": " + optionalMessage; + LOG.info(String.format("Status of job %s (%s) changed to %s%s", + jid, executionGraph.getJobName(), newJobStatus, message)); + } - final ExecutionVertex vertex = it.next(); - final TaskCancelResult result = vertex.cancelTask(); - if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) { - errorResult = result; + // remove the job graph if the state is any terminal state + if (newJobStatus == JobStatus.FINISHED || newJobStatus == JobStatus.CANCELED || newJobStatus == JobStatus.FAILED) { + this.currentJobs.remove(jid); + + try { + LibraryCacheManager.unregister(jid); + } + catch (Throwable t) { + LOG.warn("Could not properly unregister job " + jid + " from the library cache."); } } - - return errorResult; } - @Override public JobProgressResult getJobProgress(final JobID jobID) throws IOException { if (this.eventCollector == null) { - return new JobProgressResult(ReturnCode.ERROR, "JobManager does not support progress reports for jobs", - null); + return new JobProgressResult(ReturnCode.ERROR, "JobManager does not support progress reports for jobs", null); } final SerializableArrayList eventList = new SerializableArrayList(); @@ -674,104 +498,32 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol @Override public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) { - final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID); + final ExecutionGraph eg = this.currentJobs.get(jobID); if (eg == null) { LOG.error("Cannot find execution graph to job ID " + jobID); return ConnectionInfoLookupResponse.createReceiverNotFound(); } - final InternalJobStatus jobStatus = eg.getJobStatus(); - if (jobStatus == InternalJobStatus.FAILING || jobStatus == InternalJobStatus.CANCELING) { - return ConnectionInfoLookupResponse.createJobIsAborting(); - } - - final ExecutionEdge edge = eg.getEdgeByID(sourceChannelID); - if (edge == null) { - LOG.error("Cannot find execution edge associated with ID " + sourceChannelID); - return ConnectionInfoLookupResponse.createReceiverNotFound(); - } - - if (sourceChannelID.equals(edge.getInputChannelID())) { - // Request was sent from an input channel + return eg.lookupConnectionInfoAndDeployReceivers(caller, sourceChannelID); + } - final ExecutionVertex connectedVertex = edge.getOutputGate().getVertex(); - - final Instance assignedInstance = connectedVertex.getAllocatedResource().getInstance(); - if (assignedInstance == null) { - LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getOutputGateIndex() - + " but no instance assigned"); - // LOG.info("Created receiverNotReady for " + connectedVertex + " 1"); - return ConnectionInfoLookupResponse.createReceiverNotReady(); - } - - // Check execution state - final ExecutionState executionState = connectedVertex.getExecutionState(); - if (executionState == ExecutionState.FINISHED) { - // that should not happen. if there is data pending, the receiver cannot be ready - return ConnectionInfoLookupResponse.createReceiverNotFound(); - } - - // running is common, finishing is happens when the lookup is for the close event - if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING) { - // LOG.info("Created receiverNotReady for " + connectedVertex + " in state " + executionState + " 2"); - return ConnectionInfoLookupResponse.createReceiverNotReady(); - } - - if (assignedInstance.getInstanceConnectionInfo().equals(caller)) { - // Receiver runs on the same task manager - return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getOutputChannelID()); - } else { - // Receiver runs on a different task manager - - final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo(); - final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort()); - - return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID())); - } - } - // else, the request is for an output channel - // Find vertex of connected input channel - final ExecutionVertex targetVertex = edge.getInputGate().getVertex(); - - // Check execution state - final ExecutionState executionState = targetVertex.getExecutionState(); - - // check whether the task needs to be deployed - if (executionState != ExecutionState.RUNNING && executionState != ExecutionState.FINISHING && executionState != ExecutionState.FINISHED) { - - if (executionState == ExecutionState.ASSIGNED) { - final Runnable command = new Runnable() { - @Override - public void run() { - scheduler.deployAssignedVertices(targetVertex); - } - }; - eg.executeCommand(command); - } - - // LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 3"); - return ConnectionInfoLookupResponse.createReceiverNotReady(); - } - - final Instance assignedInstance = targetVertex.getAllocatedResource().getInstance(); - if (assignedInstance == null) { - LOG.error("Cannot resolve lookup: vertex found for channel ID " + edge.getInputChannelID() + " but no instance assigned"); - // LOG.info("Created receiverNotReady for " + targetVertex + " in state " + executionState + " 4"); - return ConnectionInfoLookupResponse.createReceiverNotReady(); - } - - if (assignedInstance.getInstanceConnectionInfo().equals(caller)) { - // Receiver runs on the same task manager - return ConnectionInfoLookupResponse.createReceiverFoundAndReady(edge.getInputChannelID()); - } else { - // Receiver runs on a different task manager - final InstanceConnectionInfo ici = assignedInstance.getInstanceConnectionInfo(); - final InetSocketAddress isa = new InetSocketAddress(ici.address(), ici.dataPort()); - - return ConnectionInfoLookupResponse.createReceiverFoundAndReady(new RemoteReceiver(isa, edge.getConnectionID())); - } + // -------------------------------------------------------------------------------------------- + // Properties + // -------------------------------------------------------------------------------------------- + + /** + * Tests whether the job manager has been shut down completely. + * + * @return true if the job manager has been shut down completely, false otherwise + */ + public boolean isShutDown() { + return this.isShutDown; } - + + public InstanceManager getInstanceManager() { + return this.instanceManager; + } + /** * Returns current ManagementGraph from eventCollector and, if not current, from archive * @@ -828,239 +580,10 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol return eventList; } - - @Override - public void killTask(final JobID jobID, final ManagementVertexID id) throws IOException { - - final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID); - if (eg == null) { - LOG.error("Cannot find execution graph for job " + jobID); - return; - } - - final ExecutionVertex vertex = eg.getVertexByID(ExecutionVertexID.fromManagementVertexID(id)); - if (vertex == null) { - LOG.error("Cannot find execution vertex with ID " + id); - return; - } - - LOG.info("Killing task " + vertex + " of job " + jobID); - - final Runnable runnable = new Runnable() { - - @Override - public void run() { - - final TaskKillResult result = vertex.killTask(); - if (result.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) { - LOG.error(result.getDescription()); - } - } - }; - - eg.executeCommand(runnable); - } - - /** - * Tests whether the job manager has been shut down completely. - * - * @return true if the job manager has been shut down completely, false otherwise - */ - public boolean isShutDown() { - return this.isShutDown; - } - - - - @Override - public void jobStatusHasChanged(final ExecutionGraph executionGraph, final InternalJobStatus newJobStatus, - final String optionalMessage) { - - LOG.info("Status of job " + executionGraph.getJobName() + "(" + executionGraph.getJobID() + ")" - + " changed to " + newJobStatus); - - if (newJobStatus == InternalJobStatus.FAILING) { - - // Cancel all remaining tasks - cancelJob(executionGraph); - } - - if (newJobStatus == InternalJobStatus.CANCELED || newJobStatus == InternalJobStatus.FAILED - || newJobStatus == InternalJobStatus.FINISHED) { - // Unregister job for Nephele's monitoring, optimization components, and dynamic input split assignment - unregisterJob(executionGraph); - } - } - - - @Override - public void logBufferUtilization(final JobID jobID) throws IOException { - - final ExecutionGraph eg = this.scheduler.getExecutionGraphByID(jobID); - if (eg == null) { - return; - } - - final Set allocatedInstance = new HashSet(); - - final Iterator it = new ExecutionGraphIterator(eg, true); - while (it.hasNext()) { - - final ExecutionVertex vertex = it.next(); - final ExecutionState state = vertex.getExecutionState(); - if (state == ExecutionState.RUNNING || state == ExecutionState.FINISHING) { - final Instance instance = vertex.getAllocatedResource().getInstance(); - - if (instance instanceof DummyInstance) { - LOG.error("Found instance of type DummyInstance for vertex " + vertex.getName() + " (state " - + state + ")"); - continue; - } - - allocatedInstance.add(instance); - } - } - - // Send requests to task managers from separate thread - final Runnable requestRunnable = new Runnable() { - - @Override - public void run() { - - final Iterator it2 = allocatedInstance.iterator(); - - try { - while (it2.hasNext()) { - it2.next().logBufferUtilization(); - } - } catch (IOException ioe) { - LOG.error(ioe); - } - - } - }; - - // Hand over to the executor service - this.executorService.execute(requestRunnable); - } - @Override public int getAvailableSlots() { return getInstanceManager().getTotalNumberOfSlots(); } - - - @Override - public void deploy(final JobID jobID, final Instance instance, - final List verticesToBeDeployed) { - - if (verticesToBeDeployed.isEmpty()) { - LOG.error("Method 'deploy' called but list of vertices to be deployed is empty"); - return; - } - - for (final ExecutionVertex vertex : verticesToBeDeployed) { - - // Check vertex state - if (vertex.getExecutionState() != ExecutionState.READY) { - LOG.error("Expected vertex " + vertex + " to be in state READY but it is in state " - + vertex.getExecutionState()); - } - - vertex.updateExecutionState(ExecutionState.STARTING, null); - } - - // Create a new runnable and pass it the executor service - final Runnable deploymentRunnable = new Runnable() { - - /** - * {@inheritDoc} - */ - @Override - public void run() { - - // Check if all required libraries are available on the instance - try { - instance.checkLibraryAvailability(jobID); - } catch (IOException ioe) { - LOG.error("Cannot check library availability: " + StringUtils.stringifyException(ioe)); - } - - final List submissionList = new SerializableArrayList(); - - // Check the consistency of the call - for (final ExecutionVertex vertex : verticesToBeDeployed) { - - submissionList.add(vertex.constructDeploymentDescriptor()); - - LOG.info("Starting task " + vertex + " on " + vertex.getAllocatedResource().getInstance()); - } - - List submissionResultList = null; - - try { - submissionResultList = instance.submitTasks(submissionList); - } catch (final IOException ioe) { - final String errorMsg = StringUtils.stringifyException(ioe); - for (final ExecutionVertex vertex : verticesToBeDeployed) { - vertex.updateExecutionStateAsynchronously(ExecutionState.FAILED, errorMsg); - } - } - - if (verticesToBeDeployed.size() != submissionResultList.size()) { - LOG.error("size of submission result list does not match size of list with vertices to be deployed"); - } - - int count = 0; - for (final TaskSubmissionResult tsr : submissionResultList) { - - ExecutionVertex vertex = verticesToBeDeployed.get(count++); - if (!vertex.getID().equals(tsr.getVertexID())) { - LOG.error("Expected different order of objects in task result list"); - vertex = null; - for (final ExecutionVertex candVertex : verticesToBeDeployed) { - if (tsr.getVertexID().equals(candVertex.getID())) { - vertex = candVertex; - break; - } - } - - if (vertex == null) { - LOG.error("Cannot find execution vertex for vertex ID " + tsr.getVertexID()); - continue; - } - } - - if (tsr.getReturnCode() != AbstractTaskResult.ReturnCode.SUCCESS) { - // Change the execution state to failed and let the scheduler deal with the rest - vertex.updateExecutionStateAsynchronously(ExecutionState.FAILED, tsr.getDescription()); - } - } - } - }; - - this.executorService.execute(deploymentRunnable); - } - - - @Override - public InputSplitWrapper requestNextInputSplit(final JobID jobID, final ExecutionVertexID vertexID, - final IntegerRecord sequenceNumber) throws IOException { - - final ExecutionGraph graph = this.scheduler.getExecutionGraphByID(jobID); - if (graph == null) { - LOG.error("Cannot find execution graph to job ID " + jobID); - return null; - } - - final ExecutionVertex vertex = graph.getVertexByID(vertexID); - if (vertex == null) { - LOG.error("Cannot find execution vertex for vertex ID " + vertexID); - return null; - } - - return new InputSplitWrapper(jobID, this.inputSplitManager.getNextInputSplit(vertex, sequenceNumber.getValue())); - } /** * Starts the Jetty Infoserver for the Jobmanager @@ -1081,17 +604,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol } - // TODO Add to RPC? public List getOldJobs() throws IOException { - - //final List eventList = new SerializableArrayList(); - if (this.archive == null) { throw new IOException("No instance of the event collector found"); } - //this.eventCollector.getRecentJobs(eventList); - return this.archive.getJobs(); } @@ -1103,8 +620,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol return this.instanceManager.getNumberOfRegisteredTaskManagers(); } - public Map getInstances() { - return this.instanceManager.getInstances(); + public Map getInstances() { + return this.instanceManager.getAllRegisteredInstances(); } @Override @@ -1120,4 +637,118 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol public Map> getAccumulators(JobID jobID) { return this.accumulatorManager.getJobAccumulators(jobID); } + + public Map getCurrentJobs() { + return Collections.unmodifiableMap(currentJobs); + } + + // -------------------------------------------------------------------------------------------- + // TaskManager to JobManager communication + // -------------------------------------------------------------------------------------------- + + @Override + public boolean sendHeartbeat(InstanceID taskManagerId) { + return this.instanceManager.reportHeartBeat(taskManagerId); + } + + @Override + public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription hardwareDescription, int numberOfSlots) { + return this.instanceManager.registerTaskManager(instanceConnectionInfo, hardwareDescription, numberOfSlots); + } + + + // -------------------------------------------------------------------------------------------- + // Executable + // -------------------------------------------------------------------------------------------- + + /** + * Entry point for the program + * + * @param args + * arguments from the command line + */ + + public static void main(String[] args) { + // determine if a valid log4j config exists and initialize a default logger if not + if (System.getProperty("log4j.configuration") == null) { + Logger root = Logger.getRootLogger(); + root.removeAllAppenders(); + PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n"); + ConsoleAppender appender = new ConsoleAppender(layout, "System.err"); + root.addAppender(appender); + root.setLevel(Level.INFO); + } + + JobManager jobManager; + try { + jobManager = initialize(args); + // Start info server for jobmanager + jobManager.startInfoServer(); + } + catch (Exception e) { + LOG.fatal(e.getMessage(), e); + System.exit(FAILURE_RETURN_CODE); + } + + // Clean up is triggered through a shutdown hook + // freeze this thread to keep the JVM alive (the job manager threads are daemon threads) + Object w = new Object(); + synchronized (w) { + try { + w.wait(); + } catch (InterruptedException e) {} + } + } + + @SuppressWarnings("static-access") + public static JobManager initialize(String[] args) throws Exception { + final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg() + .withDescription("Specify configuration directory.").create("configDir"); + + final Option executionModeOpt = OptionBuilder.withArgName("execution mode").hasArg() + .withDescription("Specify execution mode.").create("executionMode"); + + final Options options = new Options(); + options.addOption(configDirOpt); + options.addOption(executionModeOpt); + + CommandLineParser parser = new GnuParser(); + CommandLine line = null; + try { + line = parser.parse(options, args); + } catch (ParseException e) { + LOG.error("CLI Parsing failed. Reason: " + e.getMessage()); + System.exit(FAILURE_RETURN_CODE); + } + + final String configDir = line.getOptionValue(configDirOpt.getOpt(), null); + final String executionModeName = line.getOptionValue(executionModeOpt.getOpt(), "local"); + + ExecutionMode executionMode = null; + if ("local".equals(executionModeName)) { + executionMode = ExecutionMode.LOCAL; + } else if ("cluster".equals(executionModeName)) { + executionMode = ExecutionMode.CLUSTER; + } else { + System.err.println("Unrecognized execution mode: " + executionModeName); + System.exit(FAILURE_RETURN_CODE); + } + + // print some startup environment info, like user, code revision, etc + EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager"); + + // First, try to load global configuration + GlobalConfiguration.loadConfiguration(configDir); + + // Create a new job manager object + JobManager jobManager = new JobManager(executionMode); + + // Set base dir for info server + Configuration infoserverConfig = GlobalConfiguration.getConfiguration(); + if (configDir != null && new File(configDir).isDirectory()) { + infoserverConfig.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir+"/.."); + } + GlobalConfiguration.includeConfiguration(infoserverConfig); + return jobManager; + } }