Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3FFF311CBB for ; Thu, 26 Jun 2014 09:48:19 +0000 (UTC) Received: (qmail 27365 invoked by uid 500); 26 Jun 2014 09:48:19 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 27343 invoked by uid 500); 26 Jun 2014 09:48:19 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 27333 invoked by uid 99); 26 Jun 2014 09:48:19 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Jun 2014 09:48:19 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 26 Jun 2014 09:46:54 +0000 Received: (qmail 23321 invoked by uid 99); 26 Jun 2014 09:46:27 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Jun 2014 09:46:27 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E76448349E9; Thu, 26 Jun 2014 09:46:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rmetzger@apache.org To: commits@flink.incubator.apache.org Date: Thu, 26 Jun 2014 09:46:36 -0000 Message-Id: <790b2c3897d644b68a86583ecab30d70@git.apache.org> In-Reply-To: <082fb8975e2c47f2b349d6589076af26@git.apache.org> References: <082fb8975e2c47f2b349d6589076af26@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/53] [abbrv] git commit: Rework the Taskmanager to a slot based model and remove legacy cloud code X-Virus-Checked: Checked by ClamAV on apache.org Rework the Taskmanager to a slot based model and remove legacy cloud code Squashed commit of the following: - Post merge cleanup - Renamed fractionMemory into memoryFraction. - Removed Local and QueueScheduler and replaced it instead with an unified DefaultScheduler. - Removed Local and ClusterManager and inserted instead an unified DefaultInstanceManager. - Removed connection IDs from execution edges - Removed InstanceType, InstanceRequestMap, InstanceTypeDescription, InstanceTypeDescriptionTypeFactory, PendingRequestsMap - Fixed problems with test cases. - introduced simple slot system for scheduling. - Removed subtasks per instance - Added registerTaskManager to the JobManager RPC calls. RegisterTaskManager is called only once where the hardware description information is sent. Add: Merging cloudmodel remove with new network stack Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/86d206c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/86d206c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/86d206c4 Branch: refs/heads/travis_test Commit: 86d206c41922a1b7b8c2839b65d3568f9be55e0c Parents: 7b6b5a2 Author: Till Rohrmann Authored: Sun Jun 1 16:03:27 2014 +0200 Committer: Stephan Ewen Committed: Sun Jun 22 21:07:10 2014 +0200 ---------------------------------------------------------------------- .../api/avro/AvroExternalJarProgramITCase.java | 1 + .../eu/stratosphere/client/LocalExecutor.java | 9 + .../client/minicluster/NepheleMiniCluster.java | 23 +- .../eu/stratosphere/client/program/Client.java | 5 +- .../client/CliFrontendListCancelTest.java | 11 +- .../stratosphere/client/testjar/WordCount.java | 3 +- .../eu/stratosphere/compiler/PactCompiler.java | 473 +--------- .../compiler/costs/DefaultCostEstimator.java | 18 +- .../dag/AbstractPartialSolutionNode.java | 5 - .../compiler/dag/BinaryUnionNode.java | 48 +- .../compiler/dag/BulkIterationNode.java | 20 +- .../stratosphere/compiler/dag/DataSinkNode.java | 18 +- .../compiler/dag/DataSourceNode.java | 16 - .../compiler/dag/GroupReduceNode.java | 1 - .../compiler/dag/OptimizerNode.java | 51 +- .../stratosphere/compiler/dag/ReduceNode.java | 1 - .../compiler/dag/SingleInputNode.java | 41 +- .../stratosphere/compiler/dag/SinkJoiner.java | 1 - .../stratosphere/compiler/dag/TwoInputNode.java | 56 +- .../compiler/dag/WorksetIterationNode.java | 12 +- .../RequestedGlobalProperties.java | 8 +- .../AllGroupWithPartialPreGroupProperties.java | 3 +- .../compiler/operators/AllReduceProperties.java | 3 +- .../GroupReduceWithCombineProperties.java | 6 +- .../operators/PartialGroupProperties.java | 6 +- .../compiler/operators/ReduceProperties.java | 3 +- .../eu/stratosphere/compiler/plan/Channel.java | 59 +- .../eu/stratosphere/compiler/plan/PlanNode.java | 25 +- .../plandump/PlanJSONDumpGenerator.java | 3 - .../plantranslate/NepheleJobGraphGenerator.java | 80 +- .../pact/compiler/CompilerTestBase.java | 24 +- .../configuration/ConfigConstants.java | 12 +- .../java/eu/stratosphere/util/ClassUtils.java | 1 + .../event/job/VertexAssignmentEvent.java | 32 +- .../nephele/executiongraph/ExecutionEdge.java | 9 - .../nephele/executiongraph/ExecutionGraph.java | 181 ++-- .../executiongraph/ExecutionGroupVertex.java | 184 +--- .../nephele/executiongraph/ExecutionStage.java | 112 +-- .../nephele/executiongraph/ExecutionVertex.java | 1 - .../executiongraph/InternalJobStatus.java | 1 + .../executiongraph/ManagementGraphFactory.java | 7 +- .../nephele/instance/AbstractInstance.java | 297 ------ .../nephele/instance/AllocatedResource.java | 38 +- .../nephele/instance/AllocatedSlot.java | 65 ++ .../nephele/instance/AllocationID.java | 4 +- .../instance/DefaultInstanceManager.java | 393 ++++++++ .../nephele/instance/DummyInstance.java | 14 +- .../stratosphere/nephele/instance/Hardware.java | 24 + .../stratosphere/nephele/instance/Instance.java | 362 +++++++ .../nephele/instance/InstanceManager.java | 145 +-- .../nephele/instance/InstanceNotifier.java | 71 ++ .../nephele/instance/InstanceRequestMap.java | 184 ---- .../nephele/instance/InstanceType.java | 199 ---- .../instance/InstanceTypeDescription.java | 137 --- .../InstanceTypeDescriptionFactory.java | 46 - .../nephele/instance/InstanceTypeFactory.java | 91 -- .../nephele/instance/LocalInstanceManager.java | 60 ++ .../instance/cluster/AllocatedSlice.java | 120 --- .../instance/cluster/ClusterInstance.java | 181 ---- .../cluster/ClusterInstanceNotifier.java | 71 -- .../instance/cluster/ClusterManager.java | 945 ------------------- .../instance/cluster/PendingRequestsMap.java | 97 -- .../nephele/instance/local/LocalInstance.java | 37 - .../instance/local/LocalInstanceManager.java | 418 -------- .../instance/local/LocalInstanceNotifier.java | 70 -- .../nephele/jobgraph/AbstractJobVertex.java | 100 +- .../nephele/jobmanager/DeploymentManager.java | 8 +- .../nephele/jobmanager/EventCollector.java | 10 +- .../nephele/jobmanager/JobManager.java | 98 +- .../nephele/jobmanager/JobManagerUtils.java | 54 +- .../scheduler/AbstractExecutionListener.java | 166 ---- .../jobmanager/scheduler/AbstractScheduler.java | 662 ------------- .../scheduler/DefaultExecutionListener.java | 127 +++ .../jobmanager/scheduler/DefaultScheduler.java | 762 +++++++++++++++ .../jobmanager/scheduler/RecoveryLogic.java | 248 ----- .../scheduler/local/LocalExecutionListener.java | 33 - .../scheduler/local/LocalScheduler.java | 213 ----- .../scheduler/queue/QueueExecutionListener.java | 40 - .../scheduler/queue/QueueScheduler.java | 216 ----- .../splitassigner/InputSplitManager.java | 2 +- .../LocatableInputSplitAssigner.java | 4 +- .../splitassigner/LocatableInputSplitList.java | 20 +- .../file/FileInputSplitAssigner.java | 4 +- .../splitassigner/file/FileInputSplitList.java | 20 +- .../managementgraph/ManagementGraph.java | 4 +- .../managementgraph/ManagementVertex.java | 35 +- .../eu/stratosphere/nephele/net/NetUtils.java | 2 + .../profiling/impl/JobProfilingData.java | 6 +- .../protocols/ExtendedManagementProtocol.java | 23 +- .../nephele/protocols/JobManagerProtocol.java | 19 +- .../services/iomanager/ChannelAccess.java | 1 + .../services/memorymanager/MemoryManager.java | 17 +- .../memorymanager/spi/DefaultMemoryManager.java | 39 +- .../nephele/taskmanager/TaskManager.java | 123 ++- .../RegisterTaskManagerResult.java | 50 + .../nephele/topology/NetworkNode.java | 10 - .../eu/stratosphere/nephele/util/IOUtils.java | 1 + .../pact/runtime/cache/FileCache.java | 9 +- .../hash/BuildFirstHashMatchIterator.java | 8 +- .../BuildFirstReOpenableHashMatchIterator.java | 8 +- .../hash/BuildSecondHashMatchIterator.java | 8 +- .../pact/runtime/hash/InMemoryPartition.java | 2 + .../iterative/task/IterationHeadPactTask.java | 5 +- .../pact/runtime/shipping/ShipStrategyType.java | 23 +- .../runtime/sort/AsynchronousPartialSorter.java | 11 +- .../AsynchronousPartialSorterCollector.java | 7 +- .../sort/CombiningUnilateralSortMerger.java | 18 +- .../pact/runtime/sort/UnilateralSortMerger.java | 18 +- .../AbstractCachedBuildSideMatchDriver.java | 2 +- .../pact/runtime/task/CrossDriver.java | 3 +- .../pact/runtime/task/DataSinkTask.java | 2 +- .../runtime/task/GroupReduceCombineDriver.java | 4 +- .../pact/runtime/task/MatchDriver.java | 38 +- .../pact/runtime/task/ReduceCombineDriver.java | 3 +- .../pact/runtime/task/RegularPactTask.java | 12 +- .../SynchronousChainedCombineDriver.java | 2 +- .../pact/runtime/task/util/TaskConfig.java | 68 +- .../runtime/io/channels/InputChannel.java | 9 +- .../runtime/io/gates/InputGate.java | 2 + .../runtime/io/network/RemoteReceiver.java | 20 +- .../nephele/event/job/ManagementEventTest.java | 4 +- .../executiongraph/ExecutionGraphTest.java | 258 +---- .../instance/cluster/ClusterManagerTest.java | 273 ------ .../cluster/ClusterManagerTestUtils.java | 66 -- .../cluster/DefaultInstanceManagerTest.java | 232 +++++ .../DefaultInstanceManagerTestUtils.java | 66 ++ .../instance/cluster/HostInClusterTest.java | 130 ++- .../cluster/PendingRequestsMapTest.java | 91 -- .../local/LocalInstanceManagerTest.java | 17 +- .../nephele/jobmanager/JobManagerITCase.java | 16 +- .../scheduler/queue/DefaultSchedulerTest.java | 185 ++++ .../scheduler/queue/QueueSchedulerTest.java | 186 ---- .../scheduler/queue/TestDeploymentManager.java | 4 +- .../scheduler/queue/TestInstanceManager.java | 118 +-- .../managementgraph/ManagementGraphTest.java | 11 +- .../services/iomanager/IOManagerITCase.java | 2 +- .../IOManagerPerformanceBenchmark.java | 2 +- .../services/iomanager/IOManagerTest.java | 2 +- .../memorymanager/MemorySegmentTest.java | 2 +- .../nephele/util/ServerTestUtils.java | 17 +- .../runtime/hash/HashMatchIteratorITCase.java | 14 +- .../pact/runtime/hash/HashTableITCase.java | 2 +- .../runtime/hash/ReOpenableHashTableITCase.java | 4 +- .../pact/runtime/io/ChannelViewsTest.java | 8 +- .../pact/runtime/io/SpillingBufferTest.java | 2 +- .../event/EventWithAggregatorsTest.java | 2 + .../resettable/BlockResettableIteratorTest.java | 2 +- ...lockResettableMutableObjectIteratorTest.java | 2 +- .../sort/AsynchonousPartialSorterITCase.java | 14 +- .../CombiningUnilateralSortMergerITCase.java | 8 +- .../pact/runtime/sort/ExternalSortITCase.java | 12 +- .../sort/MassiveStringSortingITCase.java | 4 +- .../sort/SortMergeMatchIteratorITCase.java | 2 +- .../runtime/task/CombineTaskExternalITCase.java | 8 +- .../pact/runtime/task/CombineTaskTest.java | 10 +- .../runtime/task/CrossTaskExternalITCase.java | 7 +- .../pact/runtime/task/CrossTaskTest.java | 36 +- .../pact/runtime/task/DataSinkTaskTest.java | 47 +- .../runtime/task/MatchTaskExternalITCase.java | 14 +- .../pact/runtime/task/MatchTaskTest.java | 56 +- .../runtime/task/ReduceTaskExternalITCase.java | 8 +- .../pact/runtime/task/ReduceTaskTest.java | 3 +- .../runtime/task/chaining/ChainTaskTest.java | 19 +- .../task/drivers/ReduceCombineDriverTest.java | 10 +- .../runtime/task/drivers/TestTaskContext.java | 2 +- .../pact/runtime/test/util/DriverTestBase.java | 8 +- .../pact/runtime/test/util/MockEnvironment.java | 9 +- .../netty/InboundEnvelopeDecoderTest.java | 2 +- .../test/compiler/util/CompilerTestBase.java | 26 +- .../test/util/AbstractTestBase.java | 48 +- .../test/util/JavaProgramTestBase.java | 2 + .../test/util/RecordAPITestBase.java | 3 + .../test/accumulators/AccumulatorITCase.java | 7 +- .../BroadcastVarsNepheleITCase.java | 16 +- .../KMeansIterativeNepheleITCase.java | 30 +- .../test/cancelling/CancellingTestBase.java | 10 +- .../test/cancelling/MapCancelingITCase.java | 13 +- .../cancelling/MatchJoinCancelingITCase.java | 17 +- .../clients/examples/LocalExecutorITCase.java | 10 +- .../exampleJavaPrograms/WordCountITCase.java | 4 +- .../ComputeEdgeDegreesITCase.java | 2 +- .../ConnectedComponentsITCase.java | 2 +- .../EnumTrianglesOnEdgesWithDegreesITCase.java | 2 +- .../TransitiveClosureNaiveITCase.java | 2 +- .../WebLogAnalysisITCase.java | 2 +- .../exampleScalaPrograms/WordCountITCase.java | 2 +- .../WordCountPactValueITCase.java | 2 +- .../WordCountWithCountFunctionITCase.java | 2 +- .../test/failingPrograms/TaskFailureITCase.java | 8 +- .../CoGroupConnectedComponentsITCase.java | 6 +- .../iterative/ConnectedComponentsITCase.java | 6 +- ...ectedComponentsWithDeferredUpdateITCase.java | 3 +- ...tedComponentsWithSolutionSetFirstITCase.java | 7 +- .../test/iterative/DanglingPageRankITCase.java | 3 +- .../test/iterative/DeltaPageRankITCase.java | 3 +- .../DependencyConnectedComponentsITCase.java | 5 +- ...IterationTerminationWithTerminationTail.java | 6 +- .../IterationTerminationWithTwoTails.java | 6 +- .../IterationWithAllReducerITCase.java | 6 +- .../iterative/IterationWithChainingITCase.java | 3 +- .../iterative/IterationWithUnionITCase.java | 3 +- .../test/iterative/IterativeKMeansITCase.java | 6 +- .../test/iterative/KMeansITCase.java | 8 +- .../test/iterative/LineRankITCase.java | 5 +- .../test/iterative/PageRankITCase.java | 3 +- .../ConnectedComponentsNepheleITCase.java | 54 +- .../nephele/DanglingPageRankNepheleITCase.java | 7 +- ...nglingPageRankWithCombinerNepheleITCase.java | 7 +- .../IterationWithChainingNepheleITCase.java | 17 +- .../test/iterative/nephele/JobGraphUtils.java | 20 +- .../CustomCompensatableDanglingPageRank.java | 57 +- ...mpensatableDanglingPageRankWithCombiner.java | 59 +- .../CompensatableDanglingPageRank.java | 55 +- .../PackagedProgramEndToEndITCase.java | 15 +- .../test/operators/UnionSinkITCase.java | 3 +- .../recordJobTests/CollectionSourceTest.java | 8 +- .../ComputeEdgeDegreesITCase.java | 3 +- .../EnumTrianglesOnEdgesWithDegreesITCase.java | 3 +- .../recordJobTests/EnumTrianglesRDFITCase.java | 4 +- .../recordJobTests/GlobalSortingITCase.java | 5 +- .../GlobalSortingMixedOrderITCase.java | 62 +- .../recordJobTests/GroupOrderReduceITCase.java | 3 +- .../recordJobTests/MergeOnlyJoinITCase.java | 1 + .../test/recordJobTests/PairwiseSPITCase.java | 4 +- .../test/recordJobTests/TPCHQuery10ITCase.java | 2 +- .../test/recordJobTests/TPCHQuery3ITCase.java | 3 +- .../TPCHQuery3WithUnionITCase.java | 6 +- .../test/recordJobTests/TPCHQuery4ITCase.java | 6 +- .../test/recordJobTests/TPCHQuery9ITCase.java | 6 +- .../recordJobTests/TPCHQueryAsterixITCase.java | 6 +- .../test/recordJobTests/TeraSortITCase.java | 7 +- .../recordJobTests/WebLogAnalysisITCase.java | 6 +- .../test/recordJobTests/WordCountITCase.java | 6 +- .../WordCountUnionReduceITCase.java | 6 +- .../test/runtime/NetworkStackThroughput.java | 49 +- 235 files changed, 3917 insertions(+), 7900 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java ---------------------------------------------------------------------- diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java index a766fcb..e398acf 100644 --- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java +++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java @@ -47,6 +47,7 @@ public class AvroExternalJarProgramITCase { try { testMiniCluster = new NepheleMiniCluster(); testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT); + testMiniCluster.setTaskManagerNumSlots(4); testMiniCluster.start(); String jarFile = JAR_FILE; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java ---------------------------------------------------------------------- diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java index 89f996a..b017220 100644 --- a/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java +++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/LocalExecutor.java @@ -42,6 +42,8 @@ public class LocalExecutor extends PlanExecutor { private static boolean DEFAULT_OVERWRITE = false; + private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1; + private final Object lock = new Object(); // we lock to ensure singleton execution private NepheleMiniCluster nephele; @@ -54,6 +56,8 @@ public class LocalExecutor extends PlanExecutor { private int taskManagerDataPort = -1; + private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS; + private String configDir; private String hdfsConfigFile; @@ -129,6 +133,10 @@ public class LocalExecutor extends PlanExecutor { public void setDefaultAlwaysCreateDirectory(boolean defaultAlwaysCreateDirectory) { this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory; } + + public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; } + + public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; } // -------------------------------------------------------------------------------------------- @@ -157,6 +165,7 @@ public class LocalExecutor extends PlanExecutor { } nephele.setDefaultOverwriteFiles(defaultOverwriteFiles); nephele.setDefaultAlwaysCreateDirectory(defaultAlwaysCreateDirectory); + nephele.setTaskManagerNumSlots(taskManagerNumSlots); // start it up this.nephele.start(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java ---------------------------------------------------------------------- diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java index 79e5c64..4daca26 100644 --- a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java +++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java @@ -46,6 +46,8 @@ public class NepheleMiniCluster { private static final boolean DEFAULT_LAZY_MEMORY_ALLOCATION = true; + private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1; + // -------------------------------------------------------------------------------------------- private final Object startStopLock = new Object(); @@ -56,7 +58,9 @@ public class NepheleMiniCluster { private int taskManagerDataPort = DEFAULT_TM_DATA_PORT; - private int numTaskManager = DEFAULT_NUM_TASK_MANAGER; + private int numTaskTracker = DEFAULT_NUM_TASK_MANAGER; + + private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS; private long memorySize = DEFAULT_MEMORY_SIZE; @@ -149,9 +153,13 @@ public class NepheleMiniCluster { this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory; } - public void setNumTaskManager(int numTaskManager) { this.numTaskManager = numTaskManager; } + public void setNumTaskTracker(int numTaskTracker) { this.numTaskTracker = numTaskTracker; } + + public int getNumTaskTracker() { return numTaskTracker; } - public int getNumTaskManager() { return numTaskManager; } + public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; } + + public int getTaskManagerNumSlots() { return taskManagerNumSlots; } // ------------------------------------------------------------------------ // Life cycle and Job Submission @@ -172,7 +180,7 @@ public class NepheleMiniCluster { } else { Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort, taskManagerDataPort, memorySize, hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles, - defaultAlwaysCreateDirectory, numTaskManager); + defaultAlwaysCreateDirectory, taskManagerNumSlots, numTaskTracker); GlobalConfiguration.includeConfiguration(conf); } @@ -196,7 +204,7 @@ public class NepheleMiniCluster { // start the job manager jobManager = new JobManager(ExecutionMode.LOCAL); - waitForJobManagerToBecomeReady(numTaskManager); + waitForJobManagerToBecomeReady(numTaskTracker); } } @@ -236,7 +244,8 @@ public class NepheleMiniCluster { public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort, int taskManagerDataPort, long memorySize, String hdfsConfigFile, boolean lazyMemory, - boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory, int numTaskManager) + boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory, + int taskManagerNumSlots, int numTaskManager) { final Configuration config = new Configuration(); @@ -284,6 +293,8 @@ public class NepheleMiniCluster { config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize/numTaskManager); config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManager); + + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots); return config; } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java ---------------------------------------------------------------------- diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java index 00790f4..31138f6 100644 --- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java +++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java @@ -77,7 +77,7 @@ public class Client { configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress()); configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort()); - this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress); + this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator()); // Disable Local Execution when using a Client ContextEnvironment.disableLocalExecution(); @@ -104,8 +104,7 @@ public class Client { throw new CompilerException("Cannot find port to job manager's RPC service in the global configuration."); } - final InetSocketAddress jobManagerAddress = new InetSocketAddress(address, port); - this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator(), jobManagerAddress); + this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator()); // Disable Local Execution when using a Client ContextEnvironment.disableLocalExecution(); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java ---------------------------------------------------------------------- diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java index 7ccd420..ba02fa9 100644 --- a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java +++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendListCancelTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import org.apache.commons.cli.CommandLine; import org.junit.Assert; @@ -34,8 +33,6 @@ import eu.stratosphere.nephele.client.JobProgressResult; import eu.stratosphere.nephele.client.JobSubmissionResult; import eu.stratosphere.nephele.event.job.AbstractEvent; import eu.stratosphere.nephele.event.job.RecentJobEvent; -import eu.stratosphere.nephele.instance.InstanceType; -import eu.stratosphere.nephele.instance.InstanceTypeDescription; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobID; import eu.stratosphere.nephele.managementgraph.ManagementGraph; @@ -202,18 +199,18 @@ public class CliFrontendListCancelTest { } @Override - public Map getMapOfAvailableInstanceTypes() throws IOException { + public void logBufferUtilization(JobID jobID) throws IOException { throw new UnsupportedOperationException(); } @Override - public void logBufferUtilization(JobID jobID) throws IOException { + public NetworkTopology getNetworkTopology(JobID jobID) throws IOException { throw new UnsupportedOperationException(); } @Override - public NetworkTopology getNetworkTopology(JobID jobID) throws IOException { - throw new UnsupportedOperationException(); + public int getAvailableSlots() { + return 1; } } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java ---------------------------------------------------------------------- diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java index e827805..5218dc2 100644 --- a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java +++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java @@ -70,9 +70,10 @@ public class WordCount { * FlatMapFunction. The function takes a line (String) and splits it into * multiple pairs in the form of "(word,1)" (Tuple2). */ - @SuppressWarnings("serial") public static final class Tokenizer extends FlatMapFunction> { + private static final long serialVersionUID = 1L; + @Override public void flatMap(String value, Collector> out) { // normalize and split the line http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java index 2076902..bf3d6af 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java @@ -13,8 +13,6 @@ package eu.stratosphere.compiler; -import java.io.IOException; -import java.net.InetSocketAddress; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Deque; @@ -90,11 +88,6 @@ import eu.stratosphere.compiler.postpass.OptimizerPostPass; import eu.stratosphere.configuration.ConfigConstants; import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.GlobalConfiguration; -import eu.stratosphere.nephele.instance.InstanceType; -import eu.stratosphere.nephele.instance.InstanceTypeDescription; -import eu.stratosphere.nephele.ipc.RPC; -import eu.stratosphere.nephele.net.NetUtils; -import eu.stratosphere.nephele.protocols.ExtendedManagementProtocol; import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; import eu.stratosphere.pact.runtime.task.util.LocalStrategy; import eu.stratosphere.util.InstantiationUtil; @@ -340,24 +333,10 @@ public class PactCompiler { private final CostEstimator costEstimator; /** - * The connection used to connect to the job-manager. - */ - private final InetSocketAddress jobManagerAddress; - - /** - * The maximum number of machines (instances) to use, per the configuration. - */ - private int maxMachines; - - /** * The default degree of parallelism for jobs compiled by this compiler. */ private int defaultDegreeOfParallelism; - /** - * The maximum number of subtasks that should share an instance. - */ - private int maxIntraNodeParallelism; // ------------------------------------------------------------------------ // Constructor & Setup @@ -420,106 +399,29 @@ public class PactCompiler { * The CostEstimator to use to cost the individual operations. */ public PactCompiler(DataStatistics stats, CostEstimator estimator) { - this(stats, estimator, null); - } - - /** - * Creates a new compiler instance that uses the statistics object to determine properties about the input. - * Given those statistics, the compiler can make better choices for the execution strategies. - * as if no filesystem was given. It uses the given cost estimator to compute the costs of the individual - * operations. - *

- * The given socket-address is used to connect to the job manager to obtain system characteristics, like available - * memory. If that parameter is null, then the address is obtained from the global configuration. - * - * @param stats - * The statistics to be used to determine the input properties. - * @param estimator - * The CostEstimator to use to cost the individual operations. - * @param jobManagerConnection - * The address of the job manager that is queried for system characteristics. - */ - public PactCompiler(DataStatistics stats, CostEstimator estimator, InetSocketAddress jobManagerConnection) { this.statistics = stats; this.costEstimator = estimator; Configuration config = GlobalConfiguration.getConfiguration(); - // determine the maximum number of instances to use - this.maxMachines = -1; - // determine the default parallelization degree this.defaultDegreeOfParallelism = config.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE); - - // determine the default intra-node parallelism - int maxInNodePar = config.getInteger(ConfigConstants.PARALLELIZATION_MAX_INTRA_NODE_DEGREE_KEY, - ConfigConstants.DEFAULT_MAX_INTRA_NODE_PARALLELIZATION_DEGREE); - if (maxInNodePar == 0 || maxInNodePar < -1) { - LOG.error("Invalid maximum degree of intra-node parallelism: " + maxInNodePar + - ". Ignoring parameter."); - maxInNodePar = ConfigConstants.DEFAULT_MAX_INTRA_NODE_PARALLELIZATION_DEGREE; - } - this.maxIntraNodeParallelism = maxInNodePar; - - // assign the connection to the job-manager - if (jobManagerConnection != null) { - this.jobManagerAddress = jobManagerConnection; - } else { - final String address = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - if (address == null) { - throw new CompilerException( - "Cannot find address to job manager's RPC service in the global configuration."); - } - - final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, - ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); - if (port < 0) { - throw new CompilerException( - "Cannot find port to job manager's RPC service in the global configuration."); - } - - this.jobManagerAddress = new InetSocketAddress(address, port); - } } // ------------------------------------------------------------------------ // Getters / Setters // ------------------------------------------------------------------------ - public int getMaxMachines() { - return maxMachines; - } - - public void setMaxMachines(int maxMachines) { - if (maxMachines == -1 || maxMachines > 0) { - this.maxMachines = maxMachines; - } else { - throw new IllegalArgumentException(); - } - } - public int getDefaultDegreeOfParallelism() { return defaultDegreeOfParallelism; } public void setDefaultDegreeOfParallelism(int defaultDegreeOfParallelism) { - if (defaultDegreeOfParallelism == -1 || defaultDegreeOfParallelism > 0) { + if (defaultDegreeOfParallelism > 0) { this.defaultDegreeOfParallelism = defaultDegreeOfParallelism; } else { - throw new IllegalArgumentException(); - } - } - - public int getMaxIntraNodeParallelism() { - return maxIntraNodeParallelism; - } - - public void setMaxIntraNodeParallelism(int maxIntraNodeParallelism) { - if (maxIntraNodeParallelism == -1 || maxIntraNodeParallelism > 0) { - this.maxIntraNodeParallelism = maxIntraNodeParallelism; - } else { - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Default parallelism cannot be zero or negative."); } } @@ -550,14 +452,9 @@ public class PactCompiler { // -------------------- try to get the connection to the job manager ---------------------- // --------------------------to obtain instance information -------------------------------- final OptimizerPostPass postPasser = getPostPassFromPlan(program); - return compile(program, getInstanceTypeInfo(), postPasser); - } - - public OptimizedPlan compile(Plan program, InstanceTypeDescription type) throws CompilerException { - final OptimizerPostPass postPasser = getPostPassFromPlan(program); - return compile(program, type, postPasser); + return compile(program, postPasser); } - + /** * Translates the given pact plan in to an OptimizedPlan, where all nodes have their local strategy assigned * and all channels have a shipping strategy assigned. The process goes through several phases: @@ -569,8 +466,6 @@ public class PactCompiler { * * * @param program The program to be translated. - * @param type The instance type to schedule the execution on. Used also to determine the amount of memory - * available to the tasks. * @param postPasser The function to be used for post passing the optimizer's plan and setting the * data type specific serialization routines. * @return The optimized plan. @@ -579,8 +474,8 @@ public class PactCompiler { * Thrown, if the plan is invalid or the optimizer encountered an inconsistent * situation during the compilation process. */ - private OptimizedPlan compile(Plan program, InstanceTypeDescription type, OptimizerPostPass postPasser) throws CompilerException { - if (program == null || type == null || postPasser == null) { + private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws CompilerException { + if (program == null || postPasser == null) { throw new NullPointerException(); } @@ -588,73 +483,14 @@ public class PactCompiler { if (LOG.isDebugEnabled()) { LOG.debug("Beginning compilation of program '" + program.getJobName() + '\''); } - - final String instanceName = type.getInstanceType().getIdentifier(); - - // we subtract some percentage of the memory to accommodate for rounding errors - final long memoryPerInstance = (long) (type.getHardwareDescription().getSizeOfFreeMemory() * 0.96f); - final int numInstances = type.getMaximumNumberOfAvailableInstances(); - - // determine the maximum number of machines to use - int maxMachinesJob = program.getMaxNumberMachines(); - - if (maxMachinesJob < 1) { - maxMachinesJob = this.maxMachines; - } else if (this.maxMachines >= 1) { - // check if the program requested more than the global config allowed - if (maxMachinesJob > this.maxMachines && LOG.isWarnEnabled()) { - LOG.warn("Maximal number of machines specified in program (" + maxMachinesJob - + ") exceeds the maximum number in the global configuration (" + this.maxMachines - + "). Using the global configuration value."); - } - - maxMachinesJob = Math.min(maxMachinesJob, this.maxMachines); - } - - // adjust the maximum number of machines the the number of available instances - if (maxMachinesJob < 1) { - maxMachinesJob = numInstances; - } else if (maxMachinesJob > numInstances) { - maxMachinesJob = numInstances; - if (LOG.isInfoEnabled()) { - LOG.info("Maximal number of machines decreased to " + maxMachinesJob + - " because no more instances are available."); - } - } // set the default degree of parallelism int defaultParallelism = program.getDefaultParallelism() > 0 ? program.getDefaultParallelism() : this.defaultDegreeOfParallelism; - - if (this.maxIntraNodeParallelism > 0) { - if (defaultParallelism < 1) { - defaultParallelism = maxMachinesJob * this.maxIntraNodeParallelism; - } - else if (defaultParallelism > maxMachinesJob * this.maxIntraNodeParallelism) { - int oldParallelism = defaultParallelism; - defaultParallelism = maxMachinesJob * this.maxIntraNodeParallelism; - - if (LOG.isInfoEnabled()) { - LOG.info("Decreasing default degree of parallelism from " + oldParallelism + - " to " + defaultParallelism + " to fit a maximum number of " + maxMachinesJob + - " instances with a intra-parallelism of " + this.maxIntraNodeParallelism); - } - } - } else if (defaultParallelism < 1) { - defaultParallelism = maxMachinesJob; - if (LOG.isInfoEnabled()) { - LOG.info("No default parallelism specified. Using default parallelism of " + defaultParallelism + " (One task per instance)"); - } - } // log the output if (LOG.isDebugEnabled()) { - LOG.debug("Using a default degree of parallelism of " + defaultParallelism + - ", a maximum intra-node parallelism of " + this.maxIntraNodeParallelism + '.'); - if (this.maxMachines > 0) { - LOG.debug("The execution is limited to a maximum number of " + maxMachinesJob + " machines."); - } - + LOG.debug("Using a default degree of parallelism of " + defaultParallelism + '.'); } // the first step in the compilation is to create the optimizer plan representation @@ -666,7 +502,7 @@ public class PactCompiler { // 4) It makes estimates about the data volume of the data sources and // propagates those estimates through the plan - GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(maxMachinesJob, defaultParallelism); + GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism); program.accept(graphCreator); // if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children @@ -689,8 +525,7 @@ public class PactCompiler { // now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal // guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks - rootNode.accept(new IdAndMemoryAndEstimatesVisitor(this.statistics, - graphCreator.getMemoryConsumerCount() == 0 ? 0 : memoryPerInstance / graphCreator.getMemoryConsumerCount())); + rootNode.accept(new IdAndEstimatesVisitor(this.statistics)); // Now that the previous step is done, the next step is to traverse the graph again for the two // steps that cannot directly be performed during the plan enumeration, because we are dealing with DAGs @@ -733,9 +568,8 @@ public class PactCompiler { dp.resolveDeadlocks(bestPlanSinks); // finalize the plan - OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program, memoryPerInstance); - plan.setInstanceTypeName(instanceName); - + OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program); + // swap the binary unions for n-ary unions. this changes no strategies or memory consumers whatsoever, so // we can do this after the plan finalization plan.accept(new BinaryUnionReplacer()); @@ -755,7 +589,7 @@ public class PactCompiler { * from the plan can be traversed. */ public static List createPreOptimizedPlan(Plan program) { - GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(-1, 1); + GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(1); program.accept(graphCreator); return graphCreator.sinks; } @@ -783,22 +617,18 @@ public class PactCompiler { private final List sinks; // all data sink nodes in the optimizer plan - private final int maxMachines; // the maximum number of machines to use - private final int defaultParallelism; // the default degree of parallelism - private int numMemoryConsumers; - private final GraphCreatingVisitor parent; // reference to enclosing creator, in case of a recursive translation private final boolean forceDOP; - private GraphCreatingVisitor(int maxMachines, int defaultParallelism) { - this(null, false, maxMachines, defaultParallelism, null); + private GraphCreatingVisitor(int defaultParallelism) { + this(null, false, defaultParallelism, null); } - private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int maxMachines, + private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism, HashMap, OptimizerNode> closure) { if (closure == null){ con2node = new HashMap, OptimizerNode>(); @@ -807,7 +637,6 @@ public class PactCompiler { } this.sources = new ArrayList(4); this.sinks = new ArrayList(2); - this.maxMachines = maxMachines; this.defaultParallelism = defaultParallelism; this.parent = parent; this.forceDOP = forceDOP; @@ -878,7 +707,6 @@ public class PactCompiler { // catch this for the recursive translation of step functions BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode); p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism()); - p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance()); n = p; } else if (c instanceof WorksetPlaceHolder) { @@ -890,7 +718,6 @@ public class PactCompiler { // catch this for the recursive translation of step functions WorksetNode p = new WorksetNode(holder, containingIterationNode); p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism()); - p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance()); n = p; } else if (c instanceof SolutionSetPlaceHolder) { @@ -902,18 +729,14 @@ public class PactCompiler { // catch this for the recursive translation of step functions SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode); p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism()); - p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance()); n = p; } else { - throw new IllegalArgumentException("Unknown operator type: " + c.getClass() + " " + c); + throw new IllegalArgumentException("Unknown operator type: " + c); } this.con2node.put(c, n); - // record the potential memory consumption - this.numMemoryConsumers += n.isMemoryConsumer() ? 1 : 0; - // set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the // key-less reducer (all-reduce) if (n.getDegreeOfParallelism() < 1) { @@ -931,19 +754,6 @@ public class PactCompiler { n.setDegreeOfParallelism(par); } - // check if we need to set the instance sharing accordingly such that - // the maximum number of machines is not exceeded - if (n.getSubtasksPerInstance() < 1) { - int tasksPerInstance = 1; - if (this.maxMachines > 0) { - int p = n.getDegreeOfParallelism(); - tasksPerInstance = (p / this.maxMachines) + (p % this.maxMachines == 0 ? 0 : 1); - } - - // we group together n tasks per machine, depending on config and the above computed - // value required to obey the maximum number of machines - n.setSubtasksPerInstance(tasksPerInstance); - } return true; } @@ -966,7 +776,7 @@ public class PactCompiler { // first, recursively build the data flow for the step function final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true, - this.maxMachines, iterNode.getDegreeOfParallelism(), closure); + iterNode.getDegreeOfParallelism(), closure); BulkPartialSolutionNode partialSolution = null; @@ -994,9 +804,6 @@ public class PactCompiler { iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion); iterNode.setPartialSolution(partialSolution); - // account for the nested memory consumers - this.numMemoryConsumers += recursiveCreator.numMemoryConsumers; - // go over the contained data flow and mark the dynamic path nodes StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight()); rootOfStepFunction.accept(identifier); @@ -1013,7 +820,7 @@ public class PactCompiler { // first, recursively build the data flow for the step function final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true, - this.maxMachines, iterNode.getDegreeOfParallelism(), closure); + iterNode.getDegreeOfParallelism(), closure); // descend from the solution set delta. check that it depends on both the workset // and the solution set. If it does depend on both, this descend should create both nodes iter.getSolutionSetDelta().accept(recursiveCreator); @@ -1067,19 +874,12 @@ public class PactCompiler { iterNode.setPartialSolution(solutionSetNode, worksetNode); iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode); - // account for the nested memory consumers - this.numMemoryConsumers += recursiveCreator.numMemoryConsumers; - // go over the contained data flow and mark the dynamic path nodes StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight()); nextWorksetNode.accept(pathIdentifier); iterNode.getSolutionSetDelta().accept(pathIdentifier); } } - - int getMemoryConsumerCount() { - return this.numMemoryConsumers; - } }; private static final class StaticDynamicPathIdentifier implements Visitor { @@ -1107,17 +907,14 @@ public class PactCompiler { * Simple visitor that sets the minimal guaranteed memory per task based on the amount of available memory, * the number of memory consumers, and on the task's degree of parallelism. */ - private static final class IdAndMemoryAndEstimatesVisitor implements Visitor { + private static final class IdAndEstimatesVisitor implements Visitor { private final DataStatistics statistics; - - private final long memoryPerTaskPerInstance; - + private int id = 1; - private IdAndMemoryAndEstimatesVisitor(DataStatistics statistics, long memoryPerTaskPerInstance) { + private IdAndEstimatesVisitor(DataStatistics statistics) { this.statistics = statistics; - this.memoryPerTaskPerInstance = memoryPerTaskPerInstance; } @@ -1128,11 +925,6 @@ public class PactCompiler { return false; } - // assign minimum memory share, for lower bound estimates - final long mem = visitable.isMemoryConsumer() ? - this.memoryPerTaskPerInstance / visitable.getSubtasksPerInstance() : 0; - visitable.setMinimalMemoryPerSubTask(mem); - return true; } @@ -1234,8 +1026,6 @@ public class PactCompiler { private final Deque stackOfIterationNodes; - private long memoryPerInstance; // the amount of memory per instance - private int memoryConsumerWeights; // a counter of all memory consumers /** @@ -1248,12 +1038,7 @@ public class PactCompiler { this.stackOfIterationNodes = new ArrayDeque(); } - private OptimizedPlan createFinalPlan(List sinks, String jobName, Plan originalPlan, long memPerInstance) { - if (LOG.isDebugEnabled()) { - LOG.debug("Available memory per instance: " + memPerInstance); - } - - this.memoryPerInstance = memPerInstance; + private OptimizedPlan createFinalPlan(List sinks, String jobName, Plan originalPlan) { this.memoryConsumerWeights = 0; // traverse the graph @@ -1263,44 +1048,36 @@ public class PactCompiler { // assign the memory to each node if (this.memoryConsumerWeights > 0) { - final long memoryPerInstanceAndWeight = this.memoryPerInstance / this.memoryConsumerWeights; - - if (LOG.isDebugEnabled()) { - LOG.debug("Memory per consumer weight: " + memoryPerInstanceAndWeight); - } - for (PlanNode node : this.allNodes) { // assign memory to the driver strategy of the node final int consumerWeight = node.getMemoryConsumerWeight(); if (consumerWeight > 0) { - final long mem = memoryPerInstanceAndWeight * consumerWeight / node.getSubtasksPerInstance(); - node.setMemoryPerSubTask(mem); + final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights; + node.setRelativeMemoryPerSubtask(relativeMem); if (LOG.isDebugEnabled()) { - final long mib = mem >> 20; - LOG.debug("Assigned " + mib + " MiBytes memory to each subtask of " + - node.getPactContract().getName() + " (" + mib * node.getDegreeOfParallelism() + - " MiBytes total.)"); + LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " + + node.getPactContract().getName() + "."); } } // assign memory to the local and global strategies of the channels for (Channel c : node.getInputs()) { if (c.getLocalStrategy().dams()) { - final long mem = memoryPerInstanceAndWeight / node.getSubtasksPerInstance(); - c.setMemoryLocalStrategy(mem); + final double relativeMem = 1.0 / this.memoryConsumerWeights; + c.setRelativeMemoryLocalStrategy(relativeMem); if (LOG.isDebugEnabled()) { - final long mib = mem >> 20; - LOG.debug("Assigned " + mib + " MiBytes memory to each local strategy instance of " + - c + " (" + mib * node.getDegreeOfParallelism() + " MiBytes total.)"); + LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " + + "instance of " + c + "."); } } if (c.getTempMode() != TempMode.NONE) { - final long mem = memoryPerInstanceAndWeight / node.getSubtasksPerInstance(); - c.setTempMemory(mem); + final double relativeMem = 1.0/ this.memoryConsumerWeights; + c.setRelativeTempMemory(relativeMem); if (LOG.isDebugEnabled()) { - final long mib = mem >> 20; - LOG.debug("Assigned " + mib + " MiBytes memory to each instance of the temp table for " + - c + " (" + mib * node.getDegreeOfParallelism() + " MiBytes total.)"); + LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " + + "table" + + " " + + "for " + c + "."); } } } @@ -1525,182 +1302,4 @@ public class PactCompiler { throw new CompilerException("Class '" + className + "' is not an optimizer post passer.", ccex); } } - - private InstanceTypeDescription getInstanceTypeInfo() { - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting compiler to JobManager to dertermine instance information."); - } - - // create the connection in a separate thread, such that this thread - // can abort, if an unsuccessful connection occurs. - Map instances = null; - - JobManagerConnector jmc = new JobManagerConnector(this.jobManagerAddress); - Thread connectorThread = new Thread(jmc, "Compiler - JobManager connector."); - connectorThread.setDaemon(true); - connectorThread.start(); - - // connect and get the result - try { - jmc.waitForCompletion(); - instances = jmc.instances; - if (instances == null) { - throw new NullPointerException("Returned instance map is "); - } - } - catch (IOException e) { - throw new CompilerException(e.getMessage()); - } - catch (Throwable t) { - throw new CompilerException("Cannot connect to the JobManager to determine the available TaskManagers. " - + "Check if the JobManager is running (using the web interface or log files). Reason: " + - t.getMessage(), t); - } - - // determine which type to run on - return getType(instances); - } - - /** - * This utility method picks the instance type to be used for executing programs. - *

- * - * @param types The available types. - * @return The type to be used for scheduling. - * - * @throws CompilerException - * @throws IllegalArgumentException - */ - private InstanceTypeDescription getType(Map types) - throws CompilerException - { - if (types == null || types.size() < 1) { - throw new IllegalArgumentException("No instance type found."); - } - - InstanceTypeDescription retValue = null; - long totalMemory = 0; - int numInstances = 0; - - final Iterator it = types.values().iterator(); - while(it.hasNext()) - { - final InstanceTypeDescription descr = it.next(); - - // skip instances for which no hardware description is available - // this means typically that no - if (descr.getHardwareDescription() == null || descr.getInstanceType() == null) { - continue; - } - - final int curInstances = descr.getMaximumNumberOfAvailableInstances(); - final long curMemory = curInstances * descr.getHardwareDescription().getSizeOfFreeMemory(); - - // get, if first, or if it has more instances and not less memory, or if it has significantly more memory - // and the same number of cores still - if ( (retValue == null) || - (curInstances > numInstances && (int) (curMemory * 1.2f) > totalMemory) || - (curInstances * retValue.getInstanceType().getNumberOfCores() >= numInstances && - (int) (curMemory * 1.5f) > totalMemory) - ) - { - retValue = descr; - numInstances = curInstances; - totalMemory = curMemory; - } - } - - if (retValue == null) { - throw new CompilerException("No instance currently registered at the job-manager. Retry later.\n" + - "If the system has recently started, it may take a few seconds until the instances register."); - } - - return retValue; - } - - /** - * Utility class for an asynchronous connection to the job manager to determine the available instances. - */ - private static final class JobManagerConnector implements Runnable { - - private static final long MAX_MILLIS_TO_WAIT = 10000; - - private final InetSocketAddress jobManagerAddress; - - private final Object lock = new Object(); - - private volatile Map instances; - - private volatile Throwable error; - - - private JobManagerConnector(InetSocketAddress jobManagerAddress) { - this.jobManagerAddress = jobManagerAddress; - } - - - public Map waitForCompletion() throws Throwable { - long start = System.currentTimeMillis(); - long remaining = MAX_MILLIS_TO_WAIT; - - if (this.error != null) { - throw this.error; - } - if (this.instances != null) { - return this.instances; - } - - do { - try { - synchronized (this.lock) { - this.lock.wait(remaining); - } - } catch (InterruptedException iex) {} - } - while (this.error == null && this.instances == null && - (remaining = MAX_MILLIS_TO_WAIT + start - System.currentTimeMillis()) > 0); - - if (this.error != null) { - throw this.error; - } - if (this.instances != null) { - return this.instances; - } - - throw new IOException("Could not connect to the JobManager at " + jobManagerAddress + - ". Please make sure that the Job Manager is started properly."); - } - - - @Override - public void run() { - ExtendedManagementProtocol jobManagerConnection = null; - - try { - jobManagerConnection = RPC.getProxy(ExtendedManagementProtocol.class, - this.jobManagerAddress, NetUtils.getSocketFactory()); - - this.instances = jobManagerConnection.getMapOfAvailableInstanceTypes(); - if (this.instances == null) { - throw new IOException("Returned instance map was "); - } - } catch (Throwable t) { - this.error = t; - } finally { - // first of all, signal completion - synchronized (this.lock) { - this.lock.notifyAll(); - } - - if (jobManagerConnection != null) { - try { - RPC.stopProxy(jobManagerConnection); - } catch (Throwable t) { - LOG.error("Could not cleanly shut down connection from compiler to job manager,", t); - } - } - jobManagerConnection = null; - } - } - } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java index 058af1a..fde5970 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/costs/DefaultCostEstimator.java @@ -95,14 +95,20 @@ public class DefaultCostEstimator extends CostEstimator { @Override public void addBroadcastCost(EstimateProvider estimates, int replicationFactor, Costs costs) { - // assumption: we need ship the whole data over the network to each node. - final long estOutShipSize = estimates.getEstimatedOutputSize(); - if (estOutShipSize <= 0) { - costs.setNetworkCost(Costs.UNKNOWN); + // if our replication factor is negative, we cannot calculate broadcast costs + + if (replicationFactor > 0) { + // assumption: we need ship the whole data over the network to each node. + final long estOutShipSize = estimates.getEstimatedOutputSize(); + if (estOutShipSize <= 0) { + costs.setNetworkCost(Costs.UNKNOWN); + } else { + costs.addNetworkCost(replicationFactor * estOutShipSize); + } + costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor); } else { - costs.addNetworkCost(replicationFactor * estOutShipSize); + costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * 200); } - costs.addHeuristicNetworkCost(HEURISTIC_COST_BASE * replicationFactor * 100); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java index 8fd6f79..2f7cb2b 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/AbstractPartialSolutionNode.java @@ -42,11 +42,6 @@ public abstract class AbstractPartialSolutionNode extends OptimizerNode { public abstract IterationNode getIterationNode(); // -------------------------------------------------------------------------------------------- - - @Override - public boolean isMemoryConsumer() { - return false; - } public boolean isOnDynamicPath() { return true; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java index 70752b5..50ec01b 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BinaryUnionNode.java @@ -122,20 +122,12 @@ public class BinaryUnionNode extends TwoInputNode { final RequestedLocalProperties noLocalProps = new RequestedLocalProperties(); final int dop = getDegreeOfParallelism(); - final int subPerInstance = getSubtasksPerInstance(); - final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1); final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism(); - final int inSubPerInstance1 = getFirstPredecessorNode().getSubtasksPerInstance(); - final int inNumInstances1 = inDop1 / inSubPerInstance1 + (inDop1 % inSubPerInstance1 == 0 ? 0 : 1); final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism(); - final int inSubPerInstance2 = getSecondPredecessorNode().getSubtasksPerInstance(); - final int inNumInstances2 = inDop2 / inSubPerInstance2 + (inDop2 % inSubPerInstance2 == 0 ? 0 : 1); - - final boolean globalDopChange1 = numInstances != inNumInstances1; - final boolean globalDopChange2 = numInstances != inNumInstances2; - final boolean localDopChange1 = numInstances == inNumInstances1 & subPerInstance != inSubPerInstance1; - final boolean localDopChange2 = numInstances == inNumInstances2 & subPerInstance != inSubPerInstance2; - + + final boolean dopChange1 = dop != inDop1; + final boolean dopChange2 = dop != inDop2; + // enumerate all pairwise combination of the children's plans together with // all possible operator strategy combination @@ -154,15 +146,11 @@ public class BinaryUnionNode extends TwoInputNode { Channel c1 = new Channel(child1, this.input1.getMaterializationMode()); if (this.input1.getShipStrategy() == null) { // free to choose the ship strategy - igps.parameterizeChannel(c1, globalDopChange1, localDopChange1); + igps.parameterizeChannel(c1, dopChange1); // if the DOP changed, make sure that we cancel out properties, unless the // ship strategy preserves/establishes them even under changing DOPs - if (globalDopChange1 && !c1.getShipStrategy().isNetworkStrategy()) { - c1.getGlobalProperties().reset(); - } - if (localDopChange1 && !(c1.getShipStrategy().isNetworkStrategy() || - c1.getShipStrategy().compensatesForLocalDOPChanges())) { + if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) { c1.getGlobalProperties().reset(); } } else { @@ -173,10 +161,8 @@ public class BinaryUnionNode extends TwoInputNode { c1.setShipStrategy(this.input1.getShipStrategy()); } - if (globalDopChange1) { + if (dopChange1) { c1.adjustGlobalPropertiesForFullParallelismChange(); - } else if (localDopChange1) { - c1.adjustGlobalPropertiesForLocalParallelismChange(); } } @@ -184,15 +170,11 @@ public class BinaryUnionNode extends TwoInputNode { Channel c2 = new Channel(child2, this.input2.getMaterializationMode()); if (this.input2.getShipStrategy() == null) { // free to choose the ship strategy - igps.parameterizeChannel(c2, globalDopChange2, localDopChange2); + igps.parameterizeChannel(c2, dopChange2); // if the DOP changed, make sure that we cancel out properties, unless the // ship strategy preserves/establishes them even under changing DOPs - if (globalDopChange2 && !c2.getShipStrategy().isNetworkStrategy()) { - c2.getGlobalProperties().reset(); - } - if (localDopChange2 && !(c2.getShipStrategy().isNetworkStrategy() || - c2.getShipStrategy().compensatesForLocalDOPChanges())) { + if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) { c2.getGlobalProperties().reset(); } } else { @@ -203,10 +185,8 @@ public class BinaryUnionNode extends TwoInputNode { c2.setShipStrategy(this.input2.getShipStrategy()); } - if (globalDopChange2) { + if (dopChange2) { c2.adjustGlobalPropertiesForFullParallelismChange(); - } else if (localDopChange2) { - c2.adjustGlobalPropertiesForLocalParallelismChange(); } } @@ -224,20 +204,20 @@ public class BinaryUnionNode extends TwoInputNode { if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() != ShipStrategyType.FORWARD) { // adjust c2 to c1 c2 = c2.clone(); - p1.parameterizeChannel(c2,globalDopChange2); + p1.parameterizeChannel(c2,dopChange2); } else if (c2.getShipStrategy() == ShipStrategyType.FORWARD && c1.getShipStrategy() != ShipStrategyType.FORWARD) { // adjust c1 to c2 c1 = c1.clone(); - p2.parameterizeChannel(c1,globalDopChange1); + p2.parameterizeChannel(c1,dopChange1); } else if (c1.getShipStrategy() == ShipStrategyType.FORWARD && c2.getShipStrategy() == ShipStrategyType.FORWARD) { boolean adjustC1 = c1.getEstimatedOutputSize() <= 0 || c2.getEstimatedOutputSize() <= 0 || c1.getEstimatedOutputSize() <= c2.getEstimatedOutputSize(); if (adjustC1) { c2 = c2.clone(); - p1.parameterizeChannel(c2, globalDopChange2); + p1.parameterizeChannel(c2, dopChange2); } else { c1 = c1.clone(); - p2.parameterizeChannel(c1, globalDopChange1); + p2.parameterizeChannel(c1, dopChange1); } } else { // this should never happen, as it implies both realize a different strategy, which is http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java index f6720ea..bfbca15 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/BulkIterationNode.java @@ -65,9 +65,9 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode // -------------------------------------------------------------------------------------------- /** - * Creates a new node with a single input for the optimizer plan. + * Creates a new node for the bulk iteration. * - * @param iteration The PACT that the node represents. + * @param iteration The bulk iteration the node represents. */ public BulkIterationNode(BulkIterationBase iteration) { super(iteration); @@ -124,14 +124,12 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) { // check if the root of the step function has the same DOP as the iteration - if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() || - nextPartialSolution.getSubtasksPerInstance() != getSubtasksPerInstance() ) + if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism()) { // add a no-op to the root to express the re-partitioning NoOpNode noop = new NoOpNode(); noop.setDegreeOfParallelism(getDegreeOfParallelism()); - noop.setSubtasksPerInstance(getSubtasksPerInstance()); - + PactConnection noOpConn = new PactConnection(nextPartialSolution, noop); noop.setIncomingConnection(noOpConn); nextPartialSolution.addOutgoingConnection(noOpConn); @@ -198,12 +196,7 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode protected List getPossibleProperties() { return Collections.singletonList(new NoOpDescriptor()); } - - @Override - public boolean isMemoryConsumer() { - return true; - } - + @Override public void computeInterestingPropertiesForInputs(CostEstimator estimator) { final InterestingProperties intProps = getInterestingProperties().clone(); @@ -306,12 +299,11 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode else if (report == FeedbackPropertiesMeetRequirementsReport.NOT_MET) { // attach a no-op node through which we create the properties of the original input Channel toNoOp = new Channel(candidate); - globPropsReq.parameterizeChannel(toNoOp, false, false); + globPropsReq.parameterizeChannel(toNoOp, false); locPropsReq.parameterizeChannel(toNoOp); UnaryOperatorNode rebuildPropertiesNode = new UnaryOperatorNode("Rebuild Partial Solution Properties", FieldList.EMPTY_LIST); rebuildPropertiesNode.setDegreeOfParallelism(candidate.getDegreeOfParallelism()); - rebuildPropertiesNode.setSubtasksPerInstance(candidate.getSubtasksPerInstance()); SingleInputPlanNode rebuildPropertiesPlanNode = new SingleInputPlanNode(rebuildPropertiesNode, "Rebuild Partial Solution Properties", toNoOp, DriverStrategy.UNARY_NO_OP); rebuildPropertiesPlanNode.initProperties(toNoOp.getGlobalProperties(), toNoOp.getLocalProperties()); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java index fe823d2..d4f9d67 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSinkNode.java @@ -87,11 +87,6 @@ public class DataSinkNode extends OptimizerNode { } @Override - public boolean isMemoryConsumer() { - return getPactContract().getPartitionOrdering() != null || getPactContract().getLocalOrder() != null; - } - - @Override public List getIncomingConnections() { return Collections.singletonList(this.input); } @@ -194,21 +189,16 @@ public class DataSinkNode extends OptimizerNode { List outputPlans = new ArrayList(); final int dop = getDegreeOfParallelism(); - final int subPerInstance = getSubtasksPerInstance(); final int inDop = getPredecessorNode().getDegreeOfParallelism(); - final int inSubPerInstance = getPredecessorNode().getSubtasksPerInstance(); - final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1); - final int inNumInstances = inDop / inSubPerInstance + (inDop % inSubPerInstance == 0 ? 0 : 1); - - final boolean globalDopChange = numInstances != inNumInstances; - final boolean localDopChange = numInstances == inNumInstances & subPerInstance != inSubPerInstance; - + + final boolean dopChange = dop != inDop; + InterestingProperties ips = this.input.getInterestingProperties(); for (PlanNode p : subPlans) { for (RequestedGlobalProperties gp : ips.getGlobalProperties()) { for (RequestedLocalProperties lp : ips.getLocalProperties()) { Channel c = new Channel(p); - gp.parameterizeChannel(c, globalDopChange, localDopChange); + gp.parameterizeChannel(c, dopChange); lp.parameterizeChannel(c); c.setRequiredLocalProps(lp); c.setRequiredGlobalProps(gp); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java index 17c11c9..7234420 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/DataSourceNode.java @@ -55,7 +55,6 @@ public class DataSourceNode extends OptimizerNode { if (NonParallelInput.class.isAssignableFrom(pactContract.getUserCodeWrapper().getUserCodeClass())) { setDegreeOfParallelism(1); - setSubtasksPerInstance(1); this.sequentialInput = true; } else { this.sequentialInput = false; @@ -78,27 +77,12 @@ public class DataSourceNode extends OptimizerNode { } @Override - public boolean isMemoryConsumer() { - return false; - } - - - @Override public void setDegreeOfParallelism(int degreeOfParallelism) { // if unsplittable, DOP remains at 1 if (!this.sequentialInput) { super.setDegreeOfParallelism(degreeOfParallelism); } } - - - @Override - public void setSubtasksPerInstance(int instancesPerMachine) { - // if unsplittable, DOP remains at 1 - if (!this.sequentialInput) { - super.setSubtasksPerInstance(instancesPerMachine); - } - } @Override public List getIncomingConnections() { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java index 6eb2903..4d7230e 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/GroupReduceNode.java @@ -46,7 +46,6 @@ public class GroupReduceNode extends SingleInputNode { if (this.keys == null) { // case of a key-less reducer. force a parallelism of 1 setDegreeOfParallelism(1); - setSubtasksPerInstance(1); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java index b2c9330..85a6568 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/OptimizerNode.java @@ -262,13 +262,6 @@ public abstract class OptimizerNode implements Visitable, Estimat */ @Override public abstract void accept(Visitor visitor); - - /** - * Checks, whether this node requires memory for its tasks or not. - * - * @return True, if this node contains logic that requires memory usage, false otherwise. - */ - public abstract boolean isMemoryConsumer(); /** * Checks whether a field is modified by the user code or whether it is kept unchanged. @@ -408,7 +401,7 @@ public abstract class OptimizerNode implements Visitable, Estimat * @param degreeOfParallelism * The degree of parallelism to set. * @throws IllegalArgumentException - * If the degree of parallelism is smaller than one. + * If the degree of parallelism is smaller than one and not -1. */ public void setDegreeOfParallelism(int degreeOfParallelism) { if (degreeOfParallelism < 1) { @@ -416,48 +409,6 @@ public abstract class OptimizerNode implements Visitable, Estimat } this.degreeOfParallelism = degreeOfParallelism; } - - /** - * Gets the number of parallel instances of the contract that are - * to be executed on the same compute instance (logical machine). - * - * @return The number of subtask instances per machine. - */ - public int getSubtasksPerInstance() { - return this.subtasksPerInstance; - } - - /** - * Sets the number of parallel task instances of the contract that are - * to be executed on the same computing instance (logical machine). - * - * @param instancesPerMachine The instances per machine. - * @throws IllegalArgumentException If the number of instances per machine is smaller than one. - */ - public void setSubtasksPerInstance(int instancesPerMachine) { - if (instancesPerMachine < 1) { - throw new IllegalArgumentException(); - } - this.subtasksPerInstance = instancesPerMachine; - } - - /** - * Gets the minimal guaranteed memory per subtask for tasks represented by this OptimizerNode. - * - * @return The minimal guaranteed memory per subtask, in bytes. - */ - public long getMinimalMemoryPerSubTask() { - return this.minimalMemoryPerSubTask; - } - - /** - * Sets the minimal guaranteed memory per subtask for tasks represented by this OptimizerNode. - * - * @param minimalGuaranteedMemory The minimal guaranteed memory per subtask, in bytes. - */ - public void setMinimalMemoryPerSubTask(long minimalGuaranteedMemory) { - this.minimalMemoryPerSubTask = minimalGuaranteedMemory; - } /** * Gets the amount of memory that all subtasks of this task have jointly available. http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java index 2190060..409d027 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/ReduceNode.java @@ -36,7 +36,6 @@ public class ReduceNode extends SingleInputNode { if (this.keys == null) { // case of a key-less reducer. force a parallelism of 1 setDegreeOfParallelism(1); - setSubtasksPerInstance(1); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java index 8bf3f16..0b872a7 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SingleInputNode.java @@ -206,22 +206,6 @@ public abstract class SingleInputNode extends OptimizerNode { protected abstract List getPossibleProperties(); - - @Override - public boolean isMemoryConsumer() { - for (OperatorDescriptorSingle dps : getPossibleProperties()) { - if (dps.getStrategy().firstDam().isMaterializing()) { - return true; - } - for (RequestedLocalProperties rlp : dps.getPossibleLocalProperties()) { - if (!rlp.isTrivial()) { - return true; - } - } - } - return false; - } - @Override public void computeInterestingPropertiesForInputs(CostEstimator estimator) { // get what we inherit and what is preserved by our user code @@ -284,30 +268,21 @@ public abstract class SingleInputNode extends OptimizerNode { final ArrayList outputPlans = new ArrayList(); final int dop = getDegreeOfParallelism(); - final int subPerInstance = getSubtasksPerInstance(); final int inDop = getPredecessorNode().getDegreeOfParallelism(); - final int inSubPerInstance = getPredecessorNode().getSubtasksPerInstance(); - final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1); - final int inNumInstances = inDop / inSubPerInstance + (inDop % inSubPerInstance == 0 ? 0 : 1); - - final boolean globalDopChange = numInstances != inNumInstances; - final boolean localDopChange = numInstances == inNumInstances & subPerInstance != inSubPerInstance; - + + final boolean dopChange = inDop != dop; + // create all candidates for (PlanNode child : subPlans) { if (this.inConn.getShipStrategy() == null) { // pick the strategy ourselves for (RequestedGlobalProperties igps: intGlobal) { final Channel c = new Channel(child, this.inConn.getMaterializationMode()); - igps.parameterizeChannel(c, globalDopChange, localDopChange); + igps.parameterizeChannel(c, dopChange); // if the DOP changed, make sure that we cancel out properties, unless the // ship strategy preserves/establishes them even under changing DOPs - if (globalDopChange && !c.getShipStrategy().isNetworkStrategy()) { - c.getGlobalProperties().reset(); - } - if (localDopChange && !(c.getShipStrategy().isNetworkStrategy() || - c.getShipStrategy().compensatesForLocalDOPChanges())) { + if (dopChange && !c.getShipStrategy().isNetworkStrategy()) { c.getGlobalProperties().reset(); } @@ -332,12 +307,10 @@ public abstract class SingleInputNode extends OptimizerNode { c.setShipStrategy(this.inConn.getShipStrategy()); } - if (globalDopChange) { + if (dopChange) { c.adjustGlobalPropertiesForFullParallelismChange(); - } else if (localDopChange) { - c.adjustGlobalPropertiesForLocalParallelismChange(); } - + // check whether we meet any of the accepted properties for (RequestedGlobalProperties rgps: allValidGlobals) { if (rgps.isMetBy(c.getGlobalProperties())) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java index 2c765a5..a711ac5 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/SinkJoiner.java @@ -42,7 +42,6 @@ public class SinkJoiner extends TwoInputNode { this.input2 = conn2; setDegreeOfParallelism(1); - setSubtasksPerInstance(1); } @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/86d206c4/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java ---------------------------------------------------------------------- diff --git a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java index 9898c81..97a92d0 100644 --- a/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java +++ b/stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/TwoInputNode.java @@ -251,22 +251,6 @@ public abstract class TwoInputNode extends OptimizerNode { } protected abstract List getPossibleProperties(); - - @Override - public boolean isMemoryConsumer() { - for (OperatorDescriptorDual dpd : this.possibleProperties) { - if (dpd.getStrategy().firstDam().isMaterializing() || - dpd.getStrategy().secondDam().isMaterializing()) { - return true; - } - for (LocalPropertiesPair prp : dpd.getPossibleLocalProperties()) { - if (!(prp.getProperties1().isTrivial() && prp.getProperties2().isTrivial())) { - return true; - } - } - } - return false; - } @Override public void computeInterestingPropertiesForInputs(CostEstimator estimator) { @@ -348,20 +332,12 @@ public abstract class TwoInputNode extends OptimizerNode { final ArrayList outputPlans = new ArrayList(); final int dop = getDegreeOfParallelism(); - final int subPerInstance = getSubtasksPerInstance(); - final int numInstances = dop / subPerInstance + (dop % subPerInstance == 0 ? 0 : 1); final int inDop1 = getFirstPredecessorNode().getDegreeOfParallelism(); - final int inSubPerInstance1 = getFirstPredecessorNode().getSubtasksPerInstance(); - final int inNumInstances1 = inDop1 / inSubPerInstance1 + (inDop1 % inSubPerInstance1 == 0 ? 0 : 1); final int inDop2 = getSecondPredecessorNode().getDegreeOfParallelism(); - final int inSubPerInstance2 = getSecondPredecessorNode().getSubtasksPerInstance(); - final int inNumInstances2 = inDop2 / inSubPerInstance2 + (inDop2 % inSubPerInstance2 == 0 ? 0 : 1); - - final boolean globalDopChange1 = numInstances != inNumInstances1; - final boolean globalDopChange2 = numInstances != inNumInstances2; - final boolean localDopChange1 = numInstances == inNumInstances1 & subPerInstance != inSubPerInstance1; - final boolean localDopChange2 = numInstances == inNumInstances2 & subPerInstance != inSubPerInstance2; - + + final boolean dopChange1 = dop != inDop1; + final boolean dopChange2 = dop != inDop2; + // enumerate all pairwise combination of the children's plans together with // all possible operator strategy combination @@ -380,15 +356,11 @@ public abstract class TwoInputNode extends OptimizerNode { final Channel c1 = new Channel(child1, this.input1.getMaterializationMode()); if (this.input1.getShipStrategy() == null) { // free to choose the ship strategy - igps1.parameterizeChannel(c1, globalDopChange1, localDopChange1); + igps1.parameterizeChannel(c1, dopChange1); // if the DOP changed, make sure that we cancel out properties, unless the // ship strategy preserves/establishes them even under changing DOPs - if (globalDopChange1 && !c1.getShipStrategy().isNetworkStrategy()) { - c1.getGlobalProperties().reset(); - } - if (localDopChange1 && !(c1.getShipStrategy().isNetworkStrategy() || - c1.getShipStrategy().compensatesForLocalDOPChanges())) { + if (dopChange1 && !c1.getShipStrategy().isNetworkStrategy()) { c1.getGlobalProperties().reset(); } } else { @@ -399,10 +371,8 @@ public abstract class TwoInputNode extends OptimizerNode { c1.setShipStrategy(this.input1.getShipStrategy()); } - if (globalDopChange1) { + if (dopChange1) { c1.adjustGlobalPropertiesForFullParallelismChange(); - } else if (localDopChange1) { - c1.adjustGlobalPropertiesForLocalParallelismChange(); } } @@ -411,15 +381,11 @@ public abstract class TwoInputNode extends OptimizerNode { final Channel c2 = new Channel(child2, this.input2.getMaterializationMode()); if (this.input2.getShipStrategy() == null) { // free to choose the ship strategy - igps2.parameterizeChannel(c2, globalDopChange2, localDopChange2); + igps2.parameterizeChannel(c2, dopChange2); // if the DOP changed, make sure that we cancel out properties, unless the // ship strategy preserves/establishes them even under changing DOPs - if (globalDopChange2 && !c2.getShipStrategy().isNetworkStrategy()) { - c2.getGlobalProperties().reset(); - } - if (localDopChange2 && !(c2.getShipStrategy().isNetworkStrategy() || - c2.getShipStrategy().compensatesForLocalDOPChanges())) { + if (dopChange2 && !c2.getShipStrategy().isNetworkStrategy()) { c2.getGlobalProperties().reset(); } } else { @@ -430,10 +396,8 @@ public abstract class TwoInputNode extends OptimizerNode { c2.setShipStrategy(this.input2.getShipStrategy()); } - if (globalDopChange2) { + if (dopChange2) { c2.adjustGlobalPropertiesForFullParallelismChange(); - } else if (localDopChange2) { - c2.adjustGlobalPropertiesForLocalParallelismChange(); } }