Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 78F1D11F96 for ; Sun, 22 Jun 2014 21:48:18 +0000 (UTC) Received: (qmail 374 invoked by uid 500); 22 Jun 2014 21:48:18 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 349 invoked by uid 500); 22 Jun 2014 21:48:18 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 335 invoked by uid 99); 22 Jun 2014 21:48:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 22 Jun 2014 21:48:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 22 Jun 2014 21:47:49 +0000 Received: (qmail 98388 invoked by uid 99); 22 Jun 2014 21:47:22 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 22 Jun 2014 21:47:22 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7F7D18868BD; Sun, 22 Jun 2014 21:47:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.incubator.apache.org Date: Sun, 22 Jun 2014 21:47:31 -0000 Message-Id: <7939c35f2bb645188a18a894875ec971@git.apache.org> In-Reply-To: <45e2aadd30764067bd3def1f2b5ed7bb@git.apache.org> References: <45e2aadd30764067bd3def1f2b5ed7bb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [10/22] Rework the Taskmanager to a slot based model and remove legacy cloud code X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java index 2c70794..f425695 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/WorksetIterationNode.java @@ -155,8 +155,7 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode UnaryOperatorNode solutionSetDeltaUpdateAux = new UnaryOperatorNode("Solution-Set Delta", getSolutionSetKeyFields(), new SolutionSetDeltaOperator(getSolutionSetKeyFields())); solutionSetDeltaUpdateAux.setDegreeOfParallelism(getDegreeOfParallelism()); - solutionSetDeltaUpdateAux.setSubtasksPerInstance(getSubtasksPerInstance()); - + PactConnection conn = new PactConnection(solutionSetDelta, solutionSetDeltaUpdateAux); solutionSetDeltaUpdateAux.setIncomingConnection(conn); solutionSetDelta.addOutgoingConnection(conn); @@ -218,11 +217,6 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode // -------------------------------------------------------------------------------------------- @Override - public boolean isMemoryConsumer() { - return true; - } - - @Override protected List getPossibleProperties() { return new ArrayList(1); } @@ -331,13 +325,12 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) { // attach a no-op node through which we create the properties of the original input Channel toNoOp = new Channel(candidate); - globPropsReqWorkset.parameterizeChannel(toNoOp, false, false); + globPropsReqWorkset.parameterizeChannel(toNoOp, false); locPropsReqWorkset.parameterizeChannel(toNoOp); UnaryOperatorNode rebuildWorksetPropertiesNode = new UnaryOperatorNode("Rebuild Workset Properties", FieldList.EMPTY_LIST); rebuildWorksetPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism()); - rebuildWorksetPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance()); SingleInputPlanNode rebuildWorksetPropertiesPlanNode = new SingleInputPlanNode(rebuildWorksetPropertiesNode, "Rebuild Workset Properties", toNoOp, DriverStrategy.UNARY_NO_OP); rebuildWorksetPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties()); @@ -518,7 +511,6 @@ public class WorksetIterationNode extends TwoInputNode implements IterationNode super(new NoOpBinaryUdfOp(new NothingTypeInfo())); setDegreeOfParallelism(1); - setSubtasksPerInstance(1); } public void setInputs(PactConnection input1, PactConnection input2) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java index 574922a..e769508 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dataproperties/RequestedGlobalProperties.java @@ -53,8 +53,7 @@ public final class RequestedGlobalProperties implements Cloneable { /** * Sets the partitioning property for the global properties. * - * @param partitioning The new partitioning to set. - * @param partitionedFields + * @param partitionedFields */ public void setHashPartitioned(FieldSet partitionedFields) { if (partitionedFields == null) { @@ -218,7 +217,7 @@ public final class RequestedGlobalProperties implements Cloneable { * @param globalDopChange * @param localDopChange */ - public void parameterizeChannel(Channel channel, boolean globalDopChange, boolean localDopChange) { + public void parameterizeChannel(Channel channel, boolean globalDopChange) { // if we request nothing, then we need no special strategy. forward, if the number of instances remains // the same, randomly repartition otherwise if (isTrivial()) { @@ -228,8 +227,7 @@ public final class RequestedGlobalProperties implements Cloneable { final GlobalProperties inGlobals = channel.getSource().getGlobalProperties(); // if we have no global parallelism change, check if we have already compatible global properties - if (!globalDopChange && !localDopChange && isMetBy(inGlobals)) { - // we meet already everything, so go forward + if (!globalDopChange && isMetBy(inGlobals)) { channel.setShipStrategy(ShipStrategyType.FORWARD); return; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java index b389855..0ef277e 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllGroupWithPartialPreGroupProperties.java @@ -48,8 +48,7 @@ public final class AllGroupWithPartialPreGroupProperties extends OperatorDescrip // create an input node for combine with same DOP as input node GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); - combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance()); - + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_GROUP_COMBINE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllReduceProperties.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllReduceProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllReduceProperties.java index be3ed74..867b9d9 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllReduceProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/AllReduceProperties.java @@ -48,8 +48,7 @@ public final class AllReduceProperties extends OperatorDescriptorSingle // create an input node for combine with same DOP as input node ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); - combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance()); - + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.ALL_REDUCE); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java index 980cf6d..ec45a53 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/GroupReduceWithCombineProperties.java @@ -85,9 +85,9 @@ public final class GroupReduceWithCombineProperties extends OperatorDescriptorSi // create an input node for combine with same DOP as input node GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode(); combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); - combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance()); - - SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList); + + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract() + .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java index a28feeb..9fb97b5 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/PartialGroupProperties.java @@ -44,9 +44,9 @@ public final class PartialGroupProperties extends OperatorDescriptorSingle { // create in input node for combine with same DOP as input node GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase) node.getPactContract()); combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); - combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance()); - - return new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in, DriverStrategy.SORTED_GROUP_COMBINE, this.keyList); + + return new SingleInputPlanNode(combinerNode, "Combine("+node.getPactContract().getName()+")", in, + DriverStrategy.SORTED_GROUP_COMBINE, this.keyList); } @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java index 4539da5..0db3fa5 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/operators/ReduceProperties.java @@ -56,8 +56,7 @@ public final class ReduceProperties extends OperatorDescriptorSingle { // create an input node for combine with same DOP as input node ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); combinerNode.setDegreeOfParallelism(in.getSource().getDegreeOfParallelism()); - combinerNode.setSubtasksPerInstance(in.getSource().getSubtasksPerInstance()); - + SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine ("+node.getPactContract().getName()+")", toCombiner, DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList); combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java index 8fe95c9..6f9418f 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/Channel.java @@ -68,11 +68,11 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection< private TempMode tempMode; - private long tempMemory; + private double relativeTempMemory; - private long memoryGlobalStrategy; + private double relativeMemoryGlobalStrategy; - private long memoryLocalStrategy; + private double relativeMemoryLocalStrategy; private int replicationFactor = 1; @@ -200,17 +200,17 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection< * * @return The temp memory. */ - public long getTempMemory() { - return this.tempMemory; + public double getRelativeTempMemory() { + return this.relativeTempMemory; } /** * Sets the memory for materializing the channel's result from this Channel. * - * @param tempMemory The memory for materialization. + * @param relativeTempMemory The memory for materialization. */ - public void setTempMemory(long tempMemory) { - this.tempMemory = tempMemory; + public void setRelativeTempMemory(double relativeTempMemory) { + this.relativeTempMemory = relativeTempMemory; } /** @@ -286,20 +286,20 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection< this.localStrategyComparator = localStrategyComparator; } - public long getMemoryGlobalStrategy() { - return memoryGlobalStrategy; + public double getRelativeMemoryGlobalStrategy() { + return relativeMemoryGlobalStrategy; } - public void setMemoryGlobalStrategy(long memoryGlobalStrategy) { - this.memoryGlobalStrategy = memoryGlobalStrategy; + public void setRelativeMemoryGlobalStrategy(double relativeMemoryGlobalStrategy) { + this.relativeMemoryGlobalStrategy = relativeMemoryGlobalStrategy; } - public long getMemoryLocalStrategy() { - return memoryLocalStrategy; + public double getRelativeMemoryLocalStrategy() { + return relativeMemoryLocalStrategy; } - public void setMemoryLocalStrategy(long memoryLocalStrategy) { - this.memoryLocalStrategy = memoryLocalStrategy; + public void setRelativeMemoryLocalStrategy(double relativeMemoryLocalStrategy) { + this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy; } public boolean isOnDynamicPath() { @@ -437,33 +437,6 @@ public class Channel implements EstimateProvider, Cloneable, DumpableConnection< } throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy); } - - public void adjustGlobalPropertiesForLocalParallelismChange() { - if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) { - throw new IllegalStateException("Cannot adjust channel for degree of parallelism " + - "change before the ship strategy is set."); - } - - // make sure the properties are acquired - if (this.globalProps == null) { - getGlobalProperties(); - } - - // some strategies globally reestablish properties - switch (this.shipStrategy) { - case FORWARD: - this.globalProps.reset(); - return; - case NONE: // excluded by sanity check. just here to silence compiler warnings check completion - case BROADCAST: - case PARTITION_HASH: - case PARTITION_RANGE: - case PARTITION_RANDOM: - return; - } - - throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy); - } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java index 69263bc..539006c 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plan/PlanNode.java @@ -65,12 +65,10 @@ public abstract class PlanNode implements Visitable, DumpableNode, DumpableNode, DumpableNode, DumpableNode= 1 ? n.getDegreeOfParallelism() : "default") + "\""); - writer.print(",\n\t\t\"subtasks_per_instance\": \"" - + (n.getSubtasksPerInstance() >= 1 ? n.getSubtasksPerInstance() : "default") + "\""); - // output node predecessors Iterator> inConns = node.getDumpableInputs().iterator(); String child1name = "", child2name = ""; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java index 5a30fb6..b4c7560 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/plantranslate/NepheleJobGraphGenerator.java @@ -22,9 +22,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import eu.stratosphere.api.common.aggregators.AggregatorRegistry; import eu.stratosphere.api.common.aggregators.AggregatorWithName; import eu.stratosphere.api.common.aggregators.ConvergenceCriterion; @@ -101,7 +98,7 @@ public class NepheleJobGraphGenerator implements Visitor { private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, true); - private static final Log LOG = LogFactory.getLog(NepheleJobGraphGenerator.class); +// private static final Log LOG = LogFactory.getLog(NepheleJobGraphGenerator.class); private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null); @@ -186,13 +183,6 @@ public class NepheleJobGraphGenerator implements Visitor { t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName()); } - // now that all have been created, make sure that all share their instances with the one - // with the highest degree of parallelism - if (program.getInstanceTypeName() != null) { - this.maxDegreeVertex.setInstanceType(program.getInstanceTypeName()); - } else { - LOG.warn("No instance type assigned to JobVertex."); - } for (AbstractJobVertex vertex : this.vertices.values()) { if (vertex != this.maxDegreeVertex) { vertex.setVertexToShareInstancesWith(this.maxDegreeVertex); @@ -231,7 +221,7 @@ public class NepheleJobGraphGenerator implements Visitor { * @param node * The node that is currently processed. * @return True, if the visitor should descend to the node's children, false if not. - * @see eu.stratosphere.util.Visitor#preVisit(eu.stratosphere.pact.common.plan.Visitable) + * @see eu.stratosphere.util.Visitor#preVisit(eu.stratosphere.util.Visitable) */ @Override public boolean preVisit(PlanNode node) { @@ -260,8 +250,7 @@ public class NepheleJobGraphGenerator implements Visitor { // operator with the tail, if they have the same DOP. not merging is currently not // implemented PlanNode root = iterationNode.getRootOfStepFunction(); - if (root.getDegreeOfParallelism() != node.getDegreeOfParallelism() || - root.getSubtasksPerInstance() != node.getSubtasksPerInstance()) + if (root.getDegreeOfParallelism() != node.getDegreeOfParallelism()) { throw new CompilerException("Error: The final operator of the step " + "function has a different degree of parallelism than the iteration operator itself."); @@ -278,14 +267,12 @@ public class NepheleJobGraphGenerator implements Visitor { PlanNode nextWorkSet = iterationNode.getNextWorkSetPlanNode(); PlanNode solutionSetDelta = iterationNode.getSolutionSetDeltaPlanNode(); - if (nextWorkSet.getDegreeOfParallelism() != node.getDegreeOfParallelism() || - nextWorkSet.getSubtasksPerInstance() != node.getSubtasksPerInstance()) + if (nextWorkSet.getDegreeOfParallelism() != node.getDegreeOfParallelism()) { throw new CompilerException("It is currently not supported that the final operator of the step " + "function has a different degree of parallelism than the iteration operator itself."); } - if (solutionSetDelta.getDegreeOfParallelism() != node.getDegreeOfParallelism() || - solutionSetDelta.getSubtasksPerInstance() != node.getSubtasksPerInstance()) + if (solutionSetDelta.getDegreeOfParallelism() != node.getDegreeOfParallelism()) { throw new CompilerException("It is currently not supported that the final operator of the step " + "function has a different degree of parallelism than the iteration operator itself."); @@ -364,11 +351,6 @@ public class NepheleJobGraphGenerator implements Visitor { if (this.maxDegreeVertex == null || this.maxDegreeVertex.getNumberOfSubtasks() < pd) { this.maxDegreeVertex = vertex; } - - // set the number of tasks per instance - if (node.getSubtasksPerInstance() >= 1) { - vertex.setNumberOfSubtasksPerInstance(node.getSubtasksPerInstance()); - } // check whether this vertex is part of an iteration step function if (this.currentIteration != null) { @@ -377,10 +359,7 @@ public class NepheleJobGraphGenerator implements Visitor { if (iterationNode.getDegreeOfParallelism() < pd) { throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, degree-of-parallelism than the iteration operator."); } - if (iterationNode.getSubtasksPerInstance() < node.getSubtasksPerInstance()) { - throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, number of subtasks-per-node than the iteration operator."); - } - + // store the id of the iterations the step functions participate in IterationDescriptor descr = this.iterations.get(this.currentIteration); new TaskConfig(vertex.getConfiguration()).setIterationId(descr.getId()); @@ -401,7 +380,7 @@ public class NepheleJobGraphGenerator implements Visitor { * * @param node * The node currently processed during the post-visit. - * @see eu.stratosphere.util.Visitor#postVisit(eu.stratosphere.pact.common.plan.Visitable) + * @see eu.stratosphere.util.Visitor#postVisit(eu.stratosphere.util.Visitable) t */ @Override public void postVisit(PlanNode node) { @@ -739,7 +718,6 @@ public class NepheleJobGraphGenerator implements Visitor { inConn.getLocalStrategy() == LocalStrategy.NONE && pred.getOutgoingChannels().size() == 1 && node.getDegreeOfParallelism() == pred.getDegreeOfParallelism() && - node.getSubtasksPerInstance() == pred.getSubtasksPerInstance() && node.getBroadcastInputs().isEmpty(); // cannot chain the nodes that produce the next workset or the next solution set, if they are not the @@ -879,7 +857,6 @@ public class NepheleJobGraphGenerator implements Visitor { c.getLocalStrategy() == LocalStrategy.NONE && c.getTempMode() == TempMode.NONE && successor.getDegreeOfParallelism() == pspn.getDegreeOfParallelism() && - successor.getSubtasksPerInstance() == pspn.getSubtasksPerInstance() && !(successor instanceof NAryUnionPlanNode) && successor != iteration.getRootOfStepFunction() && iteration.getInput().getLocalStrategy() == LocalStrategy.NONE; @@ -948,7 +925,6 @@ public class NepheleJobGraphGenerator implements Visitor { c.getLocalStrategy() == LocalStrategy.NONE && c.getTempMode() == TempMode.NONE && successor.getDegreeOfParallelism() == wspn.getDegreeOfParallelism() && - successor.getSubtasksPerInstance() == wspn.getSubtasksPerInstance() && !(successor instanceof NAryUnionPlanNode) && successor != iteration.getNextWorkSetPlanNode() && iteration.getInitialWorksetInput().getLocalStrategy() == LocalStrategy.NONE; @@ -995,17 +971,17 @@ public class NepheleJobGraphGenerator implements Visitor { } private void assignDriverResources(PlanNode node, TaskConfig config) { - final long mem = node.getMemoryPerSubTask(); - if (mem > 0) { - config.setMemoryDriver(mem); + final double relativeMem = node.getRelativeMemoryPerSubTask(); + if (relativeMem > 0) { + config.setRelativeMemoryDriver(relativeMem); config.setFilehandlesDriver(this.defaultMaxFan); config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold); } } private void assignLocalStrategyResources(Channel c, TaskConfig config, int inputNum) { - if (c.getMemoryLocalStrategy() > 0) { - config.setMemoryInput(inputNum, c.getMemoryLocalStrategy()); + if (c.getRelativeMemoryLocalStrategy() > 0) { + config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy()); config.setFilehandlesInput(inputNum, this.defaultMaxFan); config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold); } @@ -1020,13 +996,13 @@ public class NepheleJobGraphGenerator implements Visitor { * channel is then the channel into the union node, the local strategy channel the one from the union to the * actual target operator. * - * @param channelForGlobalStrategy - * @param channelForLocalStrategy + * @param channel * @param inputNumber * @param sourceVertex * @param sourceConfig * @param targetVertex * @param targetConfig + * @param isBroadcast * @throws JobGraphDefinitionException * @throws CompilerException */ @@ -1133,10 +1109,10 @@ public class NepheleJobGraphGenerator implements Visitor { if (needsMemory) { // sanity check - if (tm == null || tm == TempMode.NONE || channel.getTempMemory() < 1) { + if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) { throw new CompilerException("Bug in compiler: Inconsistent description of input materialization."); } - config.setInputMaterializationMemory(inputNum, channel.getTempMemory()); + config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory()); } } } @@ -1153,11 +1129,11 @@ public class NepheleJobGraphGenerator implements Visitor { final int numFinalOuts = headFinalOutputConfig.getNumOutputs(); headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig); headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts); - final long memForBackChannel = bulkNode.getMemoryPerSubTask(); - if (memForBackChannel <= 0) { + final double relativeMemForBackChannel = bulkNode.getRelativeMemoryPerSubTask(); + if (relativeMemForBackChannel <= 0) { throw new CompilerException("Bug: No memory has been assigned to the iteration back channel."); } - headConfig.setBackChannelMemory(memForBackChannel); + headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel); // --------------------------- create the sync task --------------------------- final JobOutputVertex sync = new JobOutputVertex("Sync(" + @@ -1219,7 +1195,6 @@ public class NepheleJobGraphGenerator implements Visitor { JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph); fakeTail.setOutputClass(FakeOutputTask.class); fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks()); - fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance()); this.auxVertices.add(fakeTail); // connect the fake tail @@ -1262,7 +1237,6 @@ public class NepheleJobGraphGenerator implements Visitor { JobOutputVertex fakeTailTerminationCriterion = new JobOutputVertex("Fake Tail for Termination Criterion", this.jobGraph); fakeTailTerminationCriterion.setOutputClass(FakeOutputTask.class); fakeTailTerminationCriterion.setNumberOfSubtasks(headVertex.getNumberOfSubtasks()); - fakeTailTerminationCriterion.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance()); this.auxVertices.add(fakeTailTerminationCriterion); // connect the fake tail @@ -1310,14 +1284,14 @@ public class NepheleJobGraphGenerator implements Visitor { final int numFinalOuts = headFinalOutputConfig.getNumOutputs(); headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig); headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts); - final long mem = iterNode.getMemoryPerSubTask(); - if (mem <= 0) { + final double relativeMemory = iterNode.getRelativeMemoryPerSubTask(); + if (relativeMemory <= 0) { throw new CompilerException("Bug: No memory has been assigned to the workset iteration."); } headConfig.setIsWorksetIteration(); - headConfig.setBackChannelMemory(mem / 2); - headConfig.setSolutionSetMemory(mem / 2); + headConfig.setRelativeBackChannelMemory(relativeMemory / 2); + headConfig.setRelativeSolutionSetMemory(relativeMemory / 2); // set the solution set serializer and comparator headConfig.setSolutionSetSerializer(iterNode.getSolutionSetSerializer()); @@ -1396,7 +1370,6 @@ public class NepheleJobGraphGenerator implements Visitor { JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph); fakeTail.setOutputClass(FakeOutputTask.class); fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks()); - fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance()); this.auxVertices.add(fakeTail); // connect the fake tail @@ -1435,7 +1408,6 @@ public class NepheleJobGraphGenerator implements Visitor { JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph); fakeTail.setOutputClass(FakeOutputTask.class); fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks()); - fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance()); this.auxVertices.add(fakeTail); // connect the fake tail @@ -1502,9 +1474,9 @@ public class NepheleJobGraphGenerator implements Visitor { private AbstractJobVertex containingVertex; - @SuppressWarnings("unchecked") - TaskInChain(@SuppressWarnings("rawtypes") Class chainedTask, TaskConfig taskConfig, String taskName) { - this.chainedTask = (Class>) chainedTask; + TaskInChain(Class> chainedTask, TaskConfig taskConfig, + String taskName) { + this.chainedTask = chainedTask; this.taskConfig = taskConfig; this.taskName = taskName; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CompilerTestBase.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CompilerTestBase.java b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CompilerTestBase.java index e0da85d..f534ad9 100644 --- a/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CompilerTestBase.java +++ b/stratosphere-compiler/src/test/java/eu/stratosphere/pact/compiler/CompilerTestBase.java @@ -12,7 +12,6 @@ **********************************************************************************************************************/ package eu.stratosphere.pact.compiler; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -37,12 +36,6 @@ import eu.stratosphere.compiler.costs.DefaultCostEstimator; import eu.stratosphere.compiler.plan.OptimizedPlan; import eu.stratosphere.compiler.plan.PlanNode; import eu.stratosphere.compiler.plan.SingleInputPlanNode; -import eu.stratosphere.nephele.instance.HardwareDescription; -import eu.stratosphere.nephele.instance.HardwareDescriptionFactory; -import eu.stratosphere.nephele.instance.InstanceType; -import eu.stratosphere.nephele.instance.InstanceTypeDescription; -import eu.stratosphere.nephele.instance.InstanceTypeDescriptionFactory; -import eu.stratosphere.nephele.instance.InstanceTypeFactory; import eu.stratosphere.util.LogUtils; import eu.stratosphere.util.OperatingSystem; import eu.stratosphere.util.Visitor; @@ -72,8 +65,6 @@ public abstract class CompilerTestBase implements java.io.Serializable { protected transient PactCompiler noStatsCompiler; - protected transient InstanceTypeDescription instanceType; - private transient int statCounter; // ------------------------------------------------------------------------ @@ -85,29 +76,22 @@ public abstract class CompilerTestBase implements java.io.Serializable { @Before public void setup() { - InetSocketAddress dummyAddr = new InetSocketAddress("localhost", 12345); - this.dataStats = new DataStatistics(); - this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator(), dummyAddr); + this.withStatsCompiler = new PactCompiler(this.dataStats, new DefaultCostEstimator()); this.withStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); - this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator(), dummyAddr); + this.noStatsCompiler = new PactCompiler(null, new DefaultCostEstimator()); this.noStatsCompiler.setDefaultDegreeOfParallelism(DEFAULT_PARALLELISM); - - // create the instance type description - InstanceType iType = InstanceTypeFactory.construct("standard", 6, 2, 4096, 100, 0); - HardwareDescription hDesc = HardwareDescriptionFactory.construct(2, 4096 * 1024 * 1024, 2000 * 1024 * 1024); - this.instanceType = InstanceTypeDescriptionFactory.construct(iType, hDesc, DEFAULT_PARALLELISM * 2); } // ------------------------------------------------------------------------ public OptimizedPlan compileWithStats(Plan p) { - return this.withStatsCompiler.compile(p, this.instanceType); + return this.withStatsCompiler.compile(p); } public OptimizedPlan compileNoStats(Plan p) { - return this.noStatsCompiler.compile(p, this.instanceType); + return this.noStatsCompiler.compile(p); } public void setSourceStatistics(GenericDataSourceBase source, long size, float recordWidth) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java index 0e4177e..eff48cc 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/configuration/ConfigConstants.java @@ -99,6 +99,11 @@ public final class ConfigConstants { public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes"; /** + * The config parameter defining the number of task slots of a task manager. + */ + public static final String TASK_MANAGER_NUM_TASK_SLOTS = "taskmanager.numberOfTaskSlots"; + + /** * The number of incoming network IO threads (e.g. incoming connection threads used in NettyConnectionManager * for the ServerBootstrap.) */ @@ -290,12 +295,7 @@ public final class ConfigConstants { /** * The default degree of parallelism for operations. */ - public static final int DEFAULT_PARALLELIZATION_DEGREE = -1; - - /** - * The default intra-node parallelism. - */ - public static final int DEFAULT_MAX_INTRA_NODE_PARALLELIZATION_DEGREE = -1; + public static final int DEFAULT_PARALLELIZATION_DEGREE = 1; // ------------------------------ Runtime --------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-core/src/main/java/eu/stratosphere/util/ClassUtils.java ---------------------------------------------------------------------- diff --git a/stratosphere-core/src/main/java/eu/stratosphere/util/ClassUtils.java b/stratosphere-core/src/main/java/eu/stratosphere/util/ClassUtils.java index 96be666..f79dd2a 100644 --- a/stratosphere-core/src/main/java/eu/stratosphere/util/ClassUtils.java +++ b/stratosphere-core/src/main/java/eu/stratosphere/util/ClassUtils.java @@ -40,6 +40,7 @@ public final class ClassUtils { throws ClassNotFoundException { if (!className.contains("Protocol")) { + System.out.println(className); throw new ClassNotFoundException("Only use this method for protocols!"); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java index f01e62d..c86c12b 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/event/job/VertexAssignmentEvent.java @@ -38,11 +38,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage private String instanceName; /** - * The type of the instance the vertex is now assigned to. - */ - private String instanceType; - - /** * Constructs a new event. * * @param timestamp @@ -51,16 +46,13 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage * identifies the vertex this event refers to * @param instanceName * the name of the instance the vertex is now assigned to - * @param instanceType - * the type of the instance the vertex is now assigned to */ public VertexAssignmentEvent(final long timestamp, final ManagementVertexID managementVertexID, - final String instanceName, final String instanceType) { + final String instanceName) { super(timestamp); this.managementVertexID = managementVertexID; this.instanceName = instanceName; - this.instanceType = instanceType; } /** @@ -90,16 +82,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage return this.instanceName; } - /** - * Returns the type of the instance the vertex is now assigned to. - * - * @return the type of the instance the vertex is now assigned to - */ - public String getInstanceType() { - return this.instanceType; - } - - @Override public void read(final DataInput in) throws IOException { @@ -107,7 +89,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage this.managementVertexID.read(in); this.instanceName = StringRecord.readString(in); - this.instanceType = StringRecord.readString(in); } @@ -118,7 +99,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage this.managementVertexID.write(out); StringRecord.writeString(out, this.instanceName); - StringRecord.writeString(out, this.instanceType); } @@ -149,16 +129,6 @@ public final class VertexAssignmentEvent extends AbstractEvent implements Manage } } - if (this.instanceType == null) { - if (vae.getInstanceType() != null) { - return false; - } - } else { - if (!this.instanceType.equals(vae.getInstanceType())) { - return false; - } - } - return true; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java index 0106361..920c47e 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionEdge.java @@ -19,7 +19,6 @@ import eu.stratosphere.runtime.io.channels.ChannelType; /** * Objects of this class represent a pair of {@link eu.stratosphere.runtime.io.serialization.io.channels.InputChannel} and {@link AbstractOutputChannel} objects * within an {@link ExecutionGraph}, Nephele's internal scheduling representation for jobs. - * */ public final class ExecutionEdge { @@ -51,42 +50,34 @@ public final class ExecutionEdge { } public ExecutionGate getInputGate() { - return this.inputGate; } public ExecutionGate getOutputGate() { - return this.outputGate; } public ChannelID getOutputChannelID() { - return this.outputChannelID; } public ChannelID getInputChannelID() { - return this.inputChannelID; } public int getOutputGateIndex() { - return this.outputGateIndex; } public int getInputGateIndex() { - return this.inputGateIndex; } public ChannelType getChannelType() { - return this.groupEdge.getChannelType(); } public int getConnectionID() { - return this.groupEdge.getConnectionID(); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java index ca7eddb..c5059f9 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGraph.java @@ -38,8 +38,6 @@ import eu.stratosphere.nephele.execution.ExecutionListener; import eu.stratosphere.nephele.execution.ExecutionState; import eu.stratosphere.nephele.instance.AllocatedResource; import eu.stratosphere.nephele.instance.DummyInstance; -import eu.stratosphere.nephele.instance.InstanceManager; -import eu.stratosphere.nephele.instance.InstanceType; import eu.stratosphere.nephele.jobgraph.DistributionPattern; import eu.stratosphere.runtime.io.gates.GateID; import eu.stratosphere.runtime.io.channels.ChannelID; @@ -160,18 +158,18 @@ public class ExecutionGraph implements ExecutionListener { * * @param job * the user's job graph - * @param instanceManager - * the instance manager + * @param defaultParallelism + * defaultParallelism in case that nodes have no parallelism set * @throws GraphConversionException * thrown if the job graph is not valid and no execution graph can be constructed from it */ - public ExecutionGraph(final JobGraph job, final InstanceManager instanceManager) + public ExecutionGraph(final JobGraph job, final int defaultParallelism) throws GraphConversionException { this(job.getJobID(), job.getName(), job.getJobConfiguration()); // Start constructing the new execution graph from given job graph try { - constructExecutionGraph(job, instanceManager); + constructExecutionGraph(job, defaultParallelism); } catch (GraphConversionException e) { throw e; // forward graph conversion exceptions } catch (Exception e) { @@ -217,7 +215,6 @@ public class ExecutionGraph implements ExecutionListener { final ExecutionGroupVertex groupVertex = it2.next(); if (groupVertex.isNumberOfMembersUserDefined()) { groupVertex.createInitialExecutionVertices(groupVertex.getUserDefinedNumberOfMembers()); - groupVertex.repairSubtasksPerInstance(); } } @@ -253,12 +250,12 @@ public class ExecutionGraph implements ExecutionListener { * * @param jobGraph * the job graph to create the execution graph from - * @param instanceManager - * the instance manager + * @param defaultParallelism + * defaultParallelism in case that nodes have no parallelism set * @throws GraphConversionException * thrown if the job graph is not valid and no execution graph can be constructed from it */ - private void constructExecutionGraph(final JobGraph jobGraph, final InstanceManager instanceManager) + private void constructExecutionGraph(final JobGraph jobGraph, final int defaultParallelism) throws GraphConversionException { // Clean up temporary data structures @@ -272,8 +269,11 @@ public class ExecutionGraph implements ExecutionListener { // Convert job vertices to execution vertices and initialize them final AbstractJobVertex[] all = jobGraph.getAllJobVertices(); for (int i = 0; i < all.length; i++) { - final ExecutionVertex createdVertex = createVertex(all[i], instanceManager, initialExecutionStage, - jobGraph.getJobConfiguration()); + if(all[i].getNumberOfSubtasks() == -1){ + all[i].setNumberOfSubtasks(defaultParallelism); + } + + final ExecutionVertex createdVertex = createVertex(all[i], initialExecutionStage); temporaryVertexMap.put(all[i], createdVertex); temporaryGroupVertexMap.put(all[i], createdVertex.getGroupVertex()); } @@ -444,37 +444,15 @@ public class ExecutionGraph implements ExecutionListener { * * @param jobVertex * the job vertex to create the execution vertex from - * @param instanceManager - * the instanceManager * @param initialExecutionStage * the initial execution stage all group vertices are added to - * @param jobConfiguration - * the configuration object originally attached to the {@link JobGraph} * @return the new execution vertex * @throws GraphConversionException * thrown if the job vertex is of an unknown subclass */ - private ExecutionVertex createVertex(final AbstractJobVertex jobVertex, final InstanceManager instanceManager, - final ExecutionStage initialExecutionStage, final Configuration jobConfiguration) + private ExecutionVertex createVertex(final AbstractJobVertex jobVertex, final ExecutionStage initialExecutionStage) throws GraphConversionException { - // If the user has requested instance type, check if the type is known by the current instance manager - InstanceType instanceType = null; - boolean userDefinedInstanceType = false; - if (jobVertex.getInstanceType() != null) { - - userDefinedInstanceType = true; - instanceType = instanceManager.getInstanceTypeByName(jobVertex.getInstanceType()); - if (instanceType == null) { - throw new GraphConversionException("Requested instance type " + jobVertex.getInstanceType() - + " is not known to the instance manager"); - } - } - - if (instanceType == null) { - instanceType = instanceManager.getDefaultInstanceType(); - } - // Create an initial execution vertex for the job vertex final Class invokableClass = jobVertex.getInvokableClass(); if (invokableClass == null) { @@ -491,8 +469,7 @@ public class ExecutionGraph implements ExecutionListener { ExecutionGroupVertex groupVertex = null; try { groupVertex = new ExecutionGroupVertex(jobVertex.getName(), jobVertex.getID(), this, - jobVertex.getNumberOfSubtasks(), instanceType, userDefinedInstanceType, - jobVertex.getNumberOfSubtasksPerInstance(), jobVertex.getVertexToShareInstancesWith() != null ? true + jobVertex.getNumberOfSubtasks(), jobVertex.getVertexToShareInstancesWith() != null ? true : false, jobVertex.getNumberOfExecutionRetries(), jobVertex.getConfiguration(), signature, invokableClass); } catch (Throwable t) { @@ -506,39 +483,6 @@ public class ExecutionGraph implements ExecutionListener { throw new GraphConversionException(StringUtils.stringifyException(e)); } - // Check if the user's specifications for the number of subtasks are valid - final int minimumNumberOfSubtasks = jobVertex.getMinimumNumberOfSubtasks(groupVertex.getEnvironment() - .getInvokable()); - final int maximumNumberOfSubtasks = jobVertex.getMaximumNumberOfSubtasks(groupVertex.getEnvironment() - .getInvokable()); - if (jobVertex.getNumberOfSubtasks() != -1) { - if (jobVertex.getNumberOfSubtasks() < 1) { - throw new GraphConversionException("Cannot split task " + jobVertex.getName() + " into " - + jobVertex.getNumberOfSubtasks() + " subtasks"); - } - - if (jobVertex.getNumberOfSubtasks() < minimumNumberOfSubtasks) { - throw new GraphConversionException("Number of subtasks must be at least " + minimumNumberOfSubtasks); - } - - if (maximumNumberOfSubtasks != -1) { - if (jobVertex.getNumberOfSubtasks() > maximumNumberOfSubtasks) { - throw new GraphConversionException("Number of subtasks for vertex " + jobVertex.getName() - + " can be at most " + maximumNumberOfSubtasks); - } - } - } - - // Check number of subtasks per instance - if (jobVertex.getNumberOfSubtasksPerInstance() != -1 && jobVertex.getNumberOfSubtasksPerInstance() < 1) { - throw new GraphConversionException("Cannot set number of subtasks per instance to " - + jobVertex.getNumberOfSubtasksPerInstance() + " for vertex " + jobVertex.getName()); - } - - // Assign min/max to the group vertex (settings are actually applied in applyUserDefinedSettings) - groupVertex.setMinMemberSize(minimumNumberOfSubtasks); - groupVertex.setMaxMemberSize(maximumNumberOfSubtasks); - // Register input and output vertices separately if (jobVertex instanceof AbstractJobInputVertex) { @@ -579,8 +523,7 @@ public class ExecutionGraph implements ExecutionListener { jobVertex.getNumberOfBackwardConnections()); // Assign initial instance to vertex (may be overwritten later on when user settings are applied) - ev.setAllocatedResource(new AllocatedResource(DummyInstance.createDummyInstance(instanceType), instanceType, - null)); + ev.setAllocatedResource(new AllocatedResource(DummyInstance.createDummyInstance(), null)); return ev; } @@ -853,6 +796,48 @@ public class ExecutionGraph implements ExecutionListener { } /** + * Retrieves the maximum parallel degree of the job represented by this execution graph + */ + public int getMaxNumberSubtasks() { + int maxDegree = 0; + final Iterator stageIterator = this.stages.iterator(); + + while(stageIterator.hasNext()){ + final ExecutionStage stage = stageIterator.next(); + + int maxPerStageDegree = stage.getMaxNumberSubtasks(); + + if(maxPerStageDegree > maxDegree){ + maxDegree = maxPerStageDegree; + } + } + + return maxDegree; + } + + /** + * Retrieves the number of required slots to run this execution graph + * @return + */ + public int getRequiredSlots(){ + int maxRequiredSlots = 0; + + final Iterator stageIterator = this.stages.iterator(); + + while(stageIterator.hasNext()){ + final ExecutionStage stage = stageIterator.next(); + + int requiredSlots = stage.getRequiredSlots(); + + if(requiredSlots > maxRequiredSlots){ + maxRequiredSlots = requiredSlots; + } + } + + return maxRequiredSlots; + } + + /** * Returns the stage which is currently executed. * * @return the currently executed stage or null if the job execution is already completed @@ -1318,25 +1303,16 @@ public class ExecutionGraph implements ExecutionListener { return this.jobName; } - @Override - public void userThreadStarted(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) { - // TODO Auto-generated method stub - - } - + public void userThreadStarted(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {} @Override - public void userThreadFinished(final JobID jobID, final ExecutionVertexID vertexID, final Thread userThread) { - // TODO Auto-generated method stub - - } + public void userThreadFinished(JobID jobID, ExecutionVertexID vertexID, Thread userThread) {} /** * Reconstructs the execution pipelines for the entire execution graph. */ private void reconstructExecutionPipelines() { - final Iterator it = this.stages.iterator(); while (it.hasNext()) { @@ -1345,39 +1321,17 @@ public class ExecutionGraph implements ExecutionListener { } /** - * Calculates the connection IDs of the graph to avoid deadlocks in the data flow at runtime. - */ - private void calculateConnectionIDs() { - - final Set alreadyVisited = new HashSet(); - final ExecutionStage lastStage = getStage(getNumberOfStages() - 1); - - for (int i = 0; i < lastStage.getNumberOfStageMembers(); ++i) { - - final ExecutionGroupVertex groupVertex = lastStage.getStageMember(i); - - int currentConnectionID = 0; - - if (groupVertex.isOutputVertex()) { - currentConnectionID = groupVertex.calculateConnectionID(currentConnectionID, alreadyVisited); - } - } - } - - /** * Returns an iterator over all execution stages contained in this graph. * * @return an iterator over all execution stages contained in this graph */ public Iterator iterator() { - return this.stages.iterator(); } @Override public int getPriority() { - return 1; } @@ -1388,7 +1342,22 @@ public class ExecutionGraph implements ExecutionListener { * the update command to be asynchronously executed on this graph */ public void executeCommand(final Runnable command) { - this.executorService.execute(command); } + + private void calculateConnectionIDs() { + final Set alreadyVisited = new HashSet(); + final ExecutionStage lastStage = getStage(getNumberOfStages() - 1); + + for (int i = 0; i < lastStage.getNumberOfStageMembers(); ++i) { + + final ExecutionGroupVertex groupVertex = lastStage.getStageMember(i); + + int currentConnectionID = 0; + + if (groupVertex.isOutputVertex()) { + currentConnectionID = groupVertex.calculateConnectionID(currentConnectionID, alreadyVisited); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java index 89b4b6d..c865609 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionGroupVertex.java @@ -18,7 +18,6 @@ import eu.stratosphere.core.io.InputSplit; import eu.stratosphere.nephele.execution.RuntimeEnvironment; import eu.stratosphere.nephele.instance.AllocatedResource; import eu.stratosphere.nephele.instance.DummyInstance; -import eu.stratosphere.nephele.instance.InstanceType; import eu.stratosphere.nephele.jobgraph.JobVertexID; import eu.stratosphere.nephele.template.AbstractInvokable; import eu.stratosphere.runtime.io.channels.ChannelType; @@ -69,41 +68,11 @@ public final class ExecutionGroupVertex { private final CopyOnWriteArrayList groupMembers = new CopyOnWriteArrayList(); /** - * Maximum number of execution vertices this group vertex can manage. - */ - private volatile int maxMemberSize = 1; - - /** - * Minimum number of execution vertices this group vertex can manage. - */ - private volatile int minMemberSize = 1; - - /** * The user defined number of execution vertices, -1 if the user has not specified it. */ private final int userDefinedNumberOfMembers; /** - * The instance type to be used for execution vertices this group vertex manages. - */ - private volatile InstanceType instanceType = null; - - /** - * Stores whether the instance type is user defined. - */ - private final boolean userDefinedInstanceType; - - /** - * Stores the number of subtasks per instance. - */ - private volatile int numberOfSubtasksPerInstance = -1; - - /** - * Stores whether the number of subtasks per instance is user defined. - */ - private final boolean userDefinedNumberOfSubtasksPerInstance; - - /** * Number of retries in case of an error before the task represented by this vertex is considered as failed. */ private final int numberOfExecutionRetries; @@ -175,12 +144,6 @@ public final class ExecutionGroupVertex { * the execution graph is group vertex belongs to * @param userDefinedNumberOfMembers * the user defined number of subtasks, -1 if the user did not specify the number - * @param instanceType - * the instance type to be used for execution vertices this group vertex manages. - * @param userDefinedInstanceType - * true if the instance type is user defined, false otherwise - * @param numberOfSubtasksPerInstance - * the user defined number of subtasks per instance, -1 if the user did not specify the number * @param userDefinedVertexToShareInstanceWith * true if the user specified another vertex to share instances with, false * otherwise @@ -197,24 +160,13 @@ public final class ExecutionGroupVertex { * throws if an error occurs while instantiating the {@link AbstractInvokable} */ public ExecutionGroupVertex(final String name, final JobVertexID jobVertexID, final ExecutionGraph executionGraph, - final int userDefinedNumberOfMembers, final InstanceType instanceType, - final boolean userDefinedInstanceType, final int numberOfSubtasksPerInstance, - final boolean userDefinedVertexToShareInstanceWith, final int numberOfExecutionRetries, - final Configuration configuration, final ExecutionSignature signature, + final int userDefinedNumberOfMembers, final boolean userDefinedVertexToShareInstanceWith, + final int numberOfExecutionRetries, final Configuration configuration, final ExecutionSignature signature, final Class invokableClass) throws Exception { this.name = (name != null) ? name : ""; this.jobVertexID = jobVertexID; this.userDefinedNumberOfMembers = userDefinedNumberOfMembers; - this.instanceType = instanceType; - this.userDefinedInstanceType = userDefinedInstanceType; - if (numberOfSubtasksPerInstance != -1) { - this.numberOfSubtasksPerInstance = numberOfSubtasksPerInstance; - this.userDefinedNumberOfSubtasksPerInstance = true; - } else { - this.numberOfSubtasksPerInstance = 1; - this.userDefinedNumberOfSubtasksPerInstance = false; - } if (numberOfExecutionRetries >= 0) { this.numberOfExecutionRetries = numberOfExecutionRetries; } else { @@ -309,32 +261,6 @@ public final class ExecutionGroupVertex { } /** - * Sets the maximum number of members this group vertex can have. - * - * @param maxSize - * the maximum number of members this group vertex can have - */ - void setMaxMemberSize(final int maxSize) { - - // TODO: Add checks here - - this.maxMemberSize = maxSize; - } - - /** - * Sets the minimum number of members this group vertex must have. - * - * @param minSize - * the minimum number of members this group vertex must have - */ - void setMinMemberSize(final int minSize) { - - // TODO: Add checks here - - this.minMemberSize = minSize; - } - - /** * Returns the current number of members this group vertex has. * * @return the current number of members this group vertex has @@ -345,24 +271,6 @@ public final class ExecutionGroupVertex { } /** - * Returns the maximum number of members this group vertex can have. - * - * @return the maximum number of members this group vertex can have - */ - public int getMaximumNumberOfGroupMembers() { - return this.maxMemberSize; - } - - /** - * Returns the minimum number of members this group vertex must have. - * - * @return the minimum number of members this group vertex must have - */ - public int getMinimumNumberOfGroupMember() { - return this.minMemberSize; - } - - /** * Wires this group vertex to the specified group vertex and creates * a back link. * @@ -376,10 +284,6 @@ public final class ExecutionGroupVertex { * the channel type to be used for this edge * @param userDefinedChannelType * true if the channel type is user defined, false otherwise - * @param compressionLevel - * the compression level to be used for this edge - * @param userDefinedCompressionLevel - * true if the compression level is user defined, false otherwise * @param distributionPattern * the distribution pattern to create the wiring between the group members * @param isBroadcast @@ -480,10 +384,10 @@ public final class ExecutionGroupVertex { * @throws GraphConversionException * thrown if the number of execution vertices for this group vertex cannot be set to the desired value */ - void createInitialExecutionVertices(final int initalNumberOfVertices) throws GraphConversionException { + void createInitialExecutionVertices(final int initialNumberOfVertices) throws GraphConversionException { // If the requested number of group vertices does not change, do nothing - if (initalNumberOfVertices == this.getCurrentNumberOfGroupMembers()) { + if (initialNumberOfVertices == this.getCurrentNumberOfGroupMembers()) { return; } @@ -517,25 +421,14 @@ public final class ExecutionGroupVertex { * } */ - if (initalNumberOfVertices < this.getMinimumNumberOfGroupMember()) { - throw new GraphConversionException("Number of members must be at least " - + this.getMinimumNumberOfGroupMember()); - } - - if ((this.getMaximumNumberOfGroupMembers() != -1) - && (initalNumberOfVertices > this.getMaximumNumberOfGroupMembers())) { - throw new GraphConversionException("Number of members cannot exceed " - + this.getMaximumNumberOfGroupMembers()); - } - final ExecutionVertex originalVertex = this.getGroupMember(0); int currentNumberOfExecutionVertices = this.getCurrentNumberOfGroupMembers(); - while (currentNumberOfExecutionVertices++ < initalNumberOfVertices) { + while (currentNumberOfExecutionVertices++ < initialNumberOfVertices) { final ExecutionVertex vertex = originalVertex.splitVertex(); vertex.setAllocatedResource(new AllocatedResource(DummyInstance - .createDummyInstance(this.instanceType), this.instanceType, null)); + .createDummyInstance(), null)); this.groupMembers.add(vertex); } @@ -645,53 +538,6 @@ public final class ExecutionGroupVertex { return this.userDefinedNumberOfMembers; } - boolean isInstanceTypeUserDefined() { - - return this.userDefinedInstanceType; - } - - void setInstanceType(final InstanceType instanceType) throws GraphConversionException { - - if (instanceType == null) { - throw new IllegalArgumentException("Argument instanceType must not be null"); - } - - if (this.userDefinedInstanceType) { - throw new GraphConversionException("Cannot overwrite user defined instance type " - + instanceType.getIdentifier()); - } - - this.instanceType = instanceType; - - // Reset instance allocation of all members and let reassignInstances do the work - for (int i = 0; i < this.groupMembers.size(); i++) { - final ExecutionVertex vertex = this.groupMembers.get(i); - vertex.setAllocatedResource(null); - } - } - - InstanceType getInstanceType() { - return this.instanceType; - } - - boolean isNumberOfSubtasksPerInstanceUserDefined() { - - return this.userDefinedNumberOfSubtasksPerInstance; - } - - void setNumberOfSubtasksPerInstance(final int numberOfSubtasksPerInstance) throws GraphConversionException { - - if (this.userDefinedNumberOfSubtasksPerInstance - && (numberOfSubtasksPerInstance != this.numberOfSubtasksPerInstance)) { - throw new GraphConversionException("Cannot overwrite user defined number of subtasks per instance"); - } - - this.numberOfSubtasksPerInstance = numberOfSubtasksPerInstance; - } - - int getNumberOfSubtasksPerInstance() { - return this.numberOfSubtasksPerInstance; - } /** * Returns the number of retries in case of an error before the task represented by this vertex is considered as @@ -766,27 +612,13 @@ public final class ExecutionGroupVertex { } - void repairSubtasksPerInstance() { - - final Iterator it = this.groupMembers.iterator(); - int count = 0; - while (it.hasNext()) { - - final ExecutionVertex v = it.next(); - v.setAllocatedResource(this.groupMembers.get( - (count++ / this.numberOfSubtasksPerInstance) * this.numberOfSubtasksPerInstance) - .getAllocatedResource()); - } - } - void repairInstanceSharing(final Set availableResources) { // Number of required resources by this group vertex - final int numberOfRequiredInstances = (this.groupMembers.size() / this.numberOfSubtasksPerInstance) - + (((this.groupMembers.size() % this.numberOfSubtasksPerInstance) != 0) ? 1 : 0); + final int numberOfRequiredSlots = this.groupMembers.size(); // Number of resources to be replaced - final int resourcesToBeReplaced = Math.min(availableResources.size(), numberOfRequiredInstances); + final int resourcesToBeReplaced = Math.min(availableResources.size(), numberOfRequiredSlots); // Build the replacement map if necessary final Map replacementMap = new HashMap(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java index eab2375..df29aef 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionStage.java @@ -15,18 +15,10 @@ package eu.stratosphere.nephele.executiongraph; import java.util.HashSet; import java.util.Iterator; -import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import eu.stratosphere.nephele.execution.ExecutionState; -import eu.stratosphere.nephele.instance.AbstractInstance; -import eu.stratosphere.nephele.instance.DummyInstance; -import eu.stratosphere.nephele.instance.InstanceRequestMap; -import eu.stratosphere.nephele.instance.InstanceType; +import eu.stratosphere.nephele.instance.Instance; import eu.stratosphere.runtime.io.channels.ChannelType; /** @@ -35,16 +27,10 @@ import eu.stratosphere.runtime.io.channels.ChannelType; * job can only start to execute if the execution of its preceding stage is complete. *

* This class is thread-safe. - * */ public final class ExecutionStage { /** - * The log object used for debugging. - */ - private static final Log LOG = LogFactory.getLog(ExecutionStage.class); - - /** * The execution graph that this stage belongs to. */ private final ExecutionGraph executionGraph; @@ -242,69 +228,6 @@ public final class ExecutionStage { } /** - * Checks which instance types and how many instances of these types are required to execute this stage - * of the job graph. The required instance types and the number of instances are collected in the given map. Note - * that this method does not clear the map before collecting the instances. - * - * @param instanceRequestMap - * the map containing the instances types and the required number of instances of the respective type - * @param executionState - * the execution state the considered vertices must be in - */ - public void collectRequiredInstanceTypes(final InstanceRequestMap instanceRequestMap, - final ExecutionState executionState) { - - final Set collectedInstances = new HashSet(); - final ExecutionGroupVertexIterator groupIt = new ExecutionGroupVertexIterator(this.getExecutionGraph(), true, - this.stageNum); - - while (groupIt.hasNext()) { - - final ExecutionGroupVertex groupVertex = groupIt.next(); - final Iterator vertexIt = groupVertex.iterator(); - while (vertexIt.hasNext()) { - - // Get the instance type from the execution vertex if it - final ExecutionVertex vertex = vertexIt.next(); - if (vertex.getExecutionState() == executionState) { - final AbstractInstance instance = vertex.getAllocatedResource().getInstance(); - - if (collectedInstances.contains(instance)) { - continue; - } else { - collectedInstances.add(instance); - } - - if (instance instanceof DummyInstance) { - - final InstanceType instanceType = instance.getType(); - int num = instanceRequestMap.getMaximumNumberOfInstances(instanceType); - ++num; - instanceRequestMap.setMaximumNumberOfInstances(instanceType, num); - if (groupVertex.isInputVertex()) { - num = instanceRequestMap.getMinimumNumberOfInstances(instanceType); - ++num; - instanceRequestMap.setMinimumNumberOfInstances(instanceType, num); - } - } else { - LOG.debug("Execution Vertex " + vertex.getName() + " (" + vertex.getID() - + ") is already assigned to non-dummy instance, skipping..."); - } - } - } - } - - final Iterator> it = instanceRequestMap.getMaximumIterator(); - while (it.hasNext()) { - - final Map.Entry entry = it.next(); - if (instanceRequestMap.getMinimumNumberOfInstances(entry.getKey()) == 0) { - instanceRequestMap.setMinimumNumberOfInstances(entry.getKey(), entry.getValue()); - } - } - } - - /** * Returns the execution graph that this stage belongs to. * * @return the execution graph that this stage belongs to @@ -446,4 +369,37 @@ public final class ExecutionStage { } } } + + public int getMaxNumberSubtasks(){ + int maxDegree = 0; + + for(int i =0; i < this.getNumberOfStageMembers(); i++){ + final ExecutionGroupVertex groupVertex = this.getStageMember(i); + + if(groupVertex.getCurrentNumberOfGroupMembers() > maxDegree){ + maxDegree = groupVertex.getCurrentNumberOfGroupMembers(); + } + } + + return maxDegree; + } + + public int getRequiredSlots(){ + Set instanceSet = new HashSet(); + + for(int i=0; i< this.getNumberOfStageMembers(); i++){ + final ExecutionGroupVertex groupVertex = this.getStageMember(i); + + final Iterator vertexIterator = groupVertex.iterator(); + + while(vertexIterator.hasNext()){ + final ExecutionVertex vertex = vertexIterator.next(); + + instanceSet.add(vertex.getAllocatedResource().getInstance()); + } + + } + + return instanceSet.size(); + } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java index 8e9395a..1e8d538 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ExecutionVertex.java @@ -855,7 +855,6 @@ public final class ExecutionVertex { * false/ otherwise */ public boolean decrementRetriesLeftAndCheck() { - return (this.retriesLeft.decrementAndGet() > 0); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/InternalJobStatus.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/InternalJobStatus.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/InternalJobStatus.java index 3a41aa2..8565495 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/InternalJobStatus.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/InternalJobStatus.java @@ -74,6 +74,7 @@ public enum InternalJobStatus { * the internal job status to converted. * @return the corresponding job status or null if no corresponding job status exists */ + @SuppressWarnings("incomplete-switch") public static JobStatus toJobStatus(InternalJobStatus status) { switch (status) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java ---------------------------------------------------------------------- diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java index 72e3651..04c68b1 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/executiongraph/ManagementGraphFactory.java @@ -17,8 +17,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import eu.stratosphere.nephele.instance.AbstractInstance; import eu.stratosphere.runtime.io.channels.ChannelType; +import eu.stratosphere.nephele.instance.Instance; import eu.stratosphere.nephele.managementgraph.ManagementEdge; import eu.stratosphere.nephele.managementgraph.ManagementEdgeID; import eu.stratosphere.nephele.managementgraph.ManagementGate; @@ -120,12 +120,11 @@ public class ManagementGraphFactory { final ExecutionVertex ev = iterator.next(); final ManagementGroupVertex parent = groupMap.get(ev.getGroupVertex()); - final AbstractInstance instance = ev.getAllocatedResource().getInstance(); + final Instance instance = ev.getAllocatedResource().getInstance(); final ManagementVertex managementVertex = new ManagementVertex( parent, ev.getID().toManagementVertexID(), - (instance.getInstanceConnectionInfo() != null) ? instance.getInstanceConnectionInfo().toString() : instance.toString(), - instance.getType().toString(), + (instance.getInstanceConnectionInfo() != null) ? instance.getInstanceConnectionInfo().toString() : instance.toString(), ev.getIndexInVertexGroup() ); managementVertex.setExecutionState(ev.getExecutionState());