Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 84579 invoked from network); 25 Sep 2009 00:28:33 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 25 Sep 2009 00:28:33 -0000 Received: (qmail 73156 invoked by uid 500); 25 Sep 2009 00:28:33 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 73106 invoked by uid 500); 25 Sep 2009 00:28:33 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 73096 invoked by uid 99); 25 Sep 2009 00:28:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Sep 2009 00:28:33 +0000 X-ASF-Spam-Status: No, hits=-1997.2 required=10.0 tests=ALL_TRUSTED,WEIRD_QUOTING X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Sep 2009 00:28:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 19279238887A; Fri, 25 Sep 2009 00:28:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r818675 [1/4] - in /hadoop/mapreduce/branches/branch-0.21: ./ src/contrib/ src/contrib/mumak/ src/contrib/mumak/bin/ src/contrib/mumak/conf/ src/contrib/mumak/ivy/ src/contrib/mumak/src/ src/contrib/mumak/src/java/ src/contrib/mumak/src/jav... Date: Fri, 25 Sep 2009 00:27:59 -0000 To: mapreduce-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090925002800.19279238887A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cdouglas Date: Fri Sep 25 00:27:57 2009 New Revision: 818675 URL: http://svn.apache.org/viewvc?rev=818675&view=rev Log: MAPREDUCE-728. Add Mumak, a Hadoop map/reduce simulator. Contributed by Arun C Murthy, Tamas Sarlos, Anirban Dasgupta, Guanying Wang, and Hong Tang Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/bin/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/bin/mumak.sh hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/build.xml hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/log4j.properties hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/mumak.xml hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy.xml hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy/libraries.properties hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/AllMapsCompletedTaskAction.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListenerAspects.aj hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/HeartbeatEvent.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobCompleteEvent.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobSubmissionEvent.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEvent.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEventListener.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEventQueue.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobCache.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStory.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobStoryProducer.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobTracker.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorLaunchTaskAction.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTracker.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorTaskTrackerStatus.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/TaskAttemptCompletionEvent.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/data/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/data/19-jobs.topology.json.gz (with props) hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/data/19-jobs.trace.json.gz (with props) hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/CheckedEventQueue.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/FakeJobs.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/HeartbeatHelper.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorEngine.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/MockSimulatorJobTracker.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEndToEnd.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEngine.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorEventQueue.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobClient.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorJobTracker.java hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/test/org/apache/hadoop/mapred/TestSimulatorTaskTracker.java Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt hadoop/mapreduce/branches/branch-0.21/build.xml hadoop/mapreduce/branches/branch-0.21/src/contrib/build-contrib.xml hadoop/mapreduce/branches/branch-0.21/src/contrib/build.xml hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListener.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobInProgress.java hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJob.java hadoop/mapreduce/branches/branch-0.21/src/tools/org/apache/hadoop/tools/rumen/ZombieJobProducer.java Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=818675&r1=818674&r2=818675&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original) +++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Fri Sep 25 00:27:57 2009 @@ -123,6 +123,9 @@ MAPREDUCE-980. Modify JobHistory to use Avro for serialization. (cutting) + MAPREDUCE-728. Add Mumak, a Hadoop map/reduce simulator. (Arun C Murthy, + Tamas Sarlos, Anirban Dasgupta, Guanying Wang, and Hong Tang via cdouglas) + IMPROVEMENTS MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop. Modified: hadoop/mapreduce/branches/branch-0.21/build.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/build.xml?rev=818675&r1=818674&r2=818675&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/build.xml (original) +++ hadoop/mapreduce/branches/branch-0.21/build.xml Fri Sep 25 00:27:57 2009 @@ -690,6 +690,7 @@ + @@ -720,11 +721,13 @@ + + + @@ -795,6 +799,7 @@ + @@ -839,6 +844,7 @@ + @@ -849,6 +855,7 @@ + Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/build-contrib.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/build-contrib.xml?rev=818675&r1=818674&r2=818675&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/build-contrib.xml (original) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/build-contrib.xml Fri Sep 25 00:27:57 2009 @@ -32,6 +32,7 @@ + @@ -70,8 +71,7 @@ - - + @@ -149,7 +149,7 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/log4j.properties URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/log4j.properties?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/log4j.properties (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/log4j.properties Fri Sep 25 00:27:57 2009 @@ -0,0 +1,87 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Define some default values that can be overridden by system properties +# + +mumak.root.logger=INFO,console,mumak +mumak.log.dir=. +mumak.log.file=mumak.log +mumak.log.layout=org.apache.log4j.PatternLayout +mumak.log.layout.pattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n + +# +# null == NullAppender +# + +log4j.appender.null=org.apache.log4j.varia.NullAppender + +# +# console == ConsoleAppender +# + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=${mumak.log.layout} +log4j.appender.console.layout.ConversionPattern=${mumak.log.layout.pattern} + +# +# general mumak output goes here +# +log4j.appender.mumak=org.apache.log4j.FileAppender +log4j.appender.mumak.File=${mumak.log.dir}/${mumak.log.file} +log4j.appender.mumak.layout=${mumak.log.layout} +log4j.appender.mumak.layout.ConversionPattern=${mumak.log.layout.pattern} + +# +# job summary output (commenting/uncommenting the following block +# to disable/enable the separate output of such information) +# +mumak.jsa.log.dir=${mumak.log.dir} +mumak.jsa.log.file=mumak-jobs-summary.log +mumak.jsa.logger=INFO,jsa +log4j.appender.jsa=org.apache.log4j.FileAppender +log4j.appender.jsa.File=${mumak.jsa.log.dir}/${mumak.jsa.log.file} +log4j.appender.jsa.layout=org.apache.log4j.PatternLayout +log4j.appender.jsa.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n +log4j.logger.org.apache.hadoop.mapred.JobInProgress$JobSummary=${mumak.jsa.logger} +log4j.additivity.org.apache.hadoop.mapred.JobInProgress$JobSummary=false + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${mumak.root.logger} + +# Logging Threshold +log4j.threshhold=ALL + +# Custom Logging levels tuned for mumak + +log4j.logger.org.apache.hadoop.net.NetworkTopology=WARN +log4j.logger.org.apache.hadoop.mapred.JobTracker=WARN +log4j.logger.org.apache.hadoop.mapred.ResourceEstimator=WARN +log4j.logger.org.apache.hadoop.mapred.Counters=ERROR +log4j.logger.org.apache.hadoop.io.compress.CodecPool=WARN +log4j.logger.org.apache.hadoop.mapred.CompletedJobStatusStore=WARN +log4j.logger.org.apache.hadoop.mapred.EagerTaskInitializationListener=WARN +log4j.logger.org.apache.hadoop.util.HostsFileReader=WARN +# set the following level to WARN/ERROR to show/ignore situation where task +# info is missing in the trace +log4j.logger.org.apache.hadoop.tools.rumen.ZombieJob=ERROR +# set the following level to WARN/ERROR to show/ignore false alarms where tasks +# complete after job failed. +log4j.logger.org.apache.hadoop.mapred.JobInProgress=ERROR +#log4j.logger.org.apache.hadoop.mapred.TaskTracker=DEBUG +#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/mumak.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/mumak.xml?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/mumak.xml (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/conf/mumak.xml Fri Sep 25 00:27:57 2009 @@ -0,0 +1,37 @@ + + + + + + + + + mumak.scale.racklocal + 1.5 + Scaling factor for task attempt runtime of rack-local over + node-local + + + + mumak.scale.rackremote + 3.0 + Scaling factor for task attempt runtime of rack-remote over + node-local + + + Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy.xml URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy.xml?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy.xml (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy.xml Fri Sep 25 00:27:57 2009 @@ -0,0 +1,114 @@ + + + + + + + Mumak + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy/libraries.properties URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy/libraries.properties?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy/libraries.properties (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/ivy/libraries.properties Fri Sep 25 00:27:57 2009 @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#This properties file lists the versions of the various artifacts used by streaming. +#It drives ivy and the generation of a maven POM + +#Please list the dependencies name with version if they are different from the ones +#listed in the global libraries.properties file (in alphabetical order) + +jackson.version=1.0.1 +aspectj.version=1.6.5 Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/AllMapsCompletedTaskAction.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/AllMapsCompletedTaskAction.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/AllMapsCompletedTaskAction.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/AllMapsCompletedTaskAction.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * This class is used for notifying a SimulatorTaskTracker running a reduce task + * that all map tasks of the job are done. A SimulatorJobTracker notifies a + * SimulatorTaskTracker by sending this TaskTrackerAction in response to a + * heartbeat(). Represents a directive to start running the user code of the + * reduce task. + * + * We introduced this extra 'push' mechanism so that we don't have to implement + * the corresponding, more complicated 'pull' part of the InterTrackerProtocol. + * We do not use proper simulation Events for signaling, and hack heartbeat() + * instead, since the job tracker does not emit Events and does not know the + * recipient task tracker _Java_ object. + */ +class AllMapsCompletedTaskAction extends TaskTrackerAction { + /** Task attempt id of the reduce task that can proceed. */ + private final org.apache.hadoop.mapreduce.TaskAttemptID taskId; + + /** + * Constructs an AllMapsCompletedTaskAction object for a given + * {@link org.apache.hadoop.mapreduce.TaskAttemptID}. + * + * @param taskId + * {@link org.apache.hadoop.mapreduce.TaskAttemptID} of the reduce + * task that can proceed + */ + public AllMapsCompletedTaskAction( + org.apache.hadoop.mapreduce.TaskAttemptID taskId) { + super(ActionType.LAUNCH_TASK); + this.taskId = taskId; + } + + /** + * Get the task attempt id of the reduce task. + * + * @return the {@link org.apache.hadoop.mapreduce.TaskAttemptID} of the + * task-attempt. + */ + public org.apache.hadoop.mapreduce.TaskAttemptID getTaskID() { + return taskId; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + taskId.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + taskId.readFields(in); + } + + @Override + public String toString() { + return "AllMapsCompletedTaskAction[taskID=" + taskId + "]"; + } +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListenerAspects.aj URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListenerAspects.aj?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListenerAspects.aj (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListenerAspects.aj Fri Sep 25 00:27:57 2009 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +public aspect EagerTaskInitializationListenerAspects { + + pointcut overrideJobAdded (JobInProgressListener listener, JobInProgress job) : + call (void JobInProgressListener.jobAdded(JobInProgress)) && + target (listener) && + args (job); + + void around(JobInProgressListener listener, JobInProgress job) : + overrideJobAdded (listener, job) { + if (listener instanceof EagerTaskInitializationListener) { + ((EagerTaskInitializationListener)listener).ttm.initJob(job); + } else { + proceed(listener, job); + } + } + + pointcut overrideJobRemoved (JobInProgressListener listener, JobInProgress job) : + call (void JobInProgressListener.jobRemoved(JobInProgress)) && + target (listener) && + args(job); + + void around(JobInProgressListener listener, JobInProgress job) : + overrideJobRemoved (listener, job) { + if (listener instanceof EagerTaskInitializationListener) { + // no-op + } else { + proceed(listener, job); + } + } +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/HeartbeatEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/HeartbeatEvent.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/HeartbeatEvent.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/HeartbeatEvent.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +/** + * This class is used by {@link SimulatorTaskTracker}s for signaling themselves + * when the next hearbeat() call to the JobTracker is due. + */ +class HeartbeatEvent extends SimulatorEvent { + /** + * Constructor. + * + * @param listener + * the {@link SimulatorTaskTracker} this event should be delivered to + * @param timestamp + * the time when this event is to be delivered + */ + public HeartbeatEvent(SimulatorEventListener listener, long timestamp) { + super(listener, timestamp); + } + +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobCompleteEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobCompleteEvent.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobCompleteEvent.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobCompleteEvent.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +/** + * {@link JobCompleteEvent} is created by {@link SimulatorJobTracker} when a job + * is completed. {@link SimulatorJobClient} picks up the event, and mark the job + * as completed. When all jobs are completed, the simulation is terminated. + */ +public class JobCompleteEvent extends SimulatorEvent { + + private SimulatorEngine engine; + private JobStatus jobStatus; + + public JobCompleteEvent(SimulatorJobClient jc, long timestamp, + JobStatus jobStatus, SimulatorEngine engine) { + super(jc, timestamp); + this.engine = engine; + this.jobStatus = jobStatus; + } + + public SimulatorEngine getEngine() { + return engine; + } + + public JobStatus getJobStatus() { + return jobStatus; + } + + @Override + protected String realToString() { + return super.realToString()+", status=("+jobStatus.toString()+")"; + } +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobSubmissionEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobSubmissionEvent.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobSubmissionEvent.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/JobSubmissionEvent.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; +import org.apache.hadoop.tools.rumen.JobStory; + +/** + * {@link SimulatorEvent} for trigging the submission of a job to the job tracker. + */ +public class JobSubmissionEvent extends SimulatorEvent { + private final JobStory job; + + public JobSubmissionEvent(SimulatorEventListener listener, long timestamp, + JobStory job) { + super(listener, timestamp); + this.job = job; + } + + public JobStory getJob() { + return job; + } + + @Override + protected String realToString() { + return super.realToString() + ", jobID=" + job.getJobID(); + } +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorClock.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +/** + * A clock class - can be mocked out for testing. + */ +class SimulatorClock extends Clock { + + long currentTime; + + SimulatorClock (long now) { + super(); + currentTime = now; + } + void setTime(long now) { + currentTime = now; + } + + @Override + long getTime() { + return currentTime; + } +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEngine.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.SimulatorEvent; +import org.apache.hadoop.mapred.SimulatorEventQueue; +import org.apache.hadoop.mapred.JobCompleteEvent; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.SimulatorJobClient; +import org.apache.hadoop.mapred.SimulatorJobTracker; +import org.apache.hadoop.mapred.SimulatorTaskTracker; +import org.apache.hadoop.net.DNSToSwitchMapping; +import org.apache.hadoop.net.StaticMapping; +import org.apache.hadoop.tools.rumen.ClusterStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; +import org.apache.hadoop.tools.rumen.MachineNode; +import org.apache.hadoop.tools.rumen.RackNode; +import org.apache.hadoop.tools.rumen.ZombieCluster; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * {@link SimulatorEngine} is the main class of the simulator. To launch the + * simulator, user can either run the main class directly with two parameters, + * input trace file and corresponding topology file, or use the script + * "bin/mumak.sh trace.json topology.json". Trace file and topology file are + * produced by rumen. + */ +public class SimulatorEngine extends Configured implements Tool { + public static final List EMPTY_EVENTS = new ArrayList(); + private static final int DEFAULT_MAP_SLOTS_PER_NODE = 2; + private static final int DEFAULT_REDUCE_SLOTS_PER_NODE = 2; + + protected final SimulatorEventQueue queue = new SimulatorEventQueue(); + String traceFile; + String topologyFile; + SimulatorJobTracker jt; + SimulatorJobClient jc; + boolean shutdown = false; + long terminateTime = Long.MAX_VALUE; + long currentTime; + + /** + * Start simulated task trackers based on topology. + * @param clusterStory The cluster topology. + * @param now + * time stamp when the simulator is started, {@link SimulatorTaskTracker}s + * are started shortly after this time stamp + */ + void startTaskTrackers(ClusterStory clusterStory, long now) { + /** port assigned to TTs, incremented by 1 for each TT */ + int port = 10000; + long ms = now + 100; + + for (MachineNode node : clusterStory.getMachines()) { + String hostname = node.getName(); + RackNode rackNode = node.getRackNode(); + StaticMapping.addNodeToRack(hostname, rackNode.getName()); + String taskTrackerName = "tracker_" + hostname + ":localhost/127.0.0.1:" + + port; + port++; + SimulatorTaskTracker tt = new SimulatorTaskTracker(jt, taskTrackerName, + hostname, node.getMapSlots(), node.getReduceSlots()); + queue.addAll(tt.init(ms++)); + } + } + + /** + * Initiate components in the simulation. + * @throws InterruptedException + * @throws IOException if trace or topology files cannot be open + */ + @SuppressWarnings("deprecation") + void init() throws InterruptedException, IOException { + long now = System.currentTimeMillis(); + + JobConf jobConf = new JobConf(getConf()); + jobConf.setClass("topology.node.switch.mapping.impl", + StaticMapping.class, DNSToSwitchMapping.class); + jobConf.set("fs.default.name", "file:///"); + jobConf.set("mapred.job.tracker", "localhost:8012"); + jobConf.setInt("mapred.jobtracker.job.history.block.size", 512); + jobConf.setInt("mapred.jobtracker.job.history.buffer.size", 512); + jobConf.setLong("mapred.tasktracker.expiry.interval", 5000); + jobConf.setInt("mapred.reduce.copy.backoff", 4); + jobConf.setLong("mapred.job.reuse.jvm.num.tasks", -1); + jobConf.setUser("mumak"); + jobConf.set("mapred.system.dir", + jobConf.get("hadoop.log.dir", "/tmp/hadoop-"+jobConf.getUser()) + "/mapred/system"); + jobConf.set("mapred.jobtracker.taskScheduler", JobQueueTaskScheduler.class.getName()); + + FileSystem lfs = FileSystem.getLocal(getConf()); + Path logPath = + new Path(System.getProperty("hadoop.log.dir")).makeQualified(lfs); + jobConf.set("mapred.system.dir", logPath.toString()); + jobConf.set("hadoop.job.history.location", (new Path(logPath, "history") + .toString())); + + jt = SimulatorJobTracker.startTracker(jobConf, now, this); + jt.offerService(); + + // max Map/Reduce tasks per node + int maxMaps = getConf().getInt("mapred.tasktracker.map.tasks.maximum", + DEFAULT_MAP_SLOTS_PER_NODE); + int maxReduces = getConf().getInt( + "mapred.tasktracker.reduce.tasks.maximum", + DEFAULT_REDUCE_SLOTS_PER_NODE); + + MachineNode defaultNode = new MachineNode.Builder("default", 2) + .setMapSlots(maxMaps).setReduceSlots(maxReduces).build(); + ZombieCluster cluster = new ZombieCluster(new Path(topologyFile), + defaultNode, jobConf); + long firstJobStartTime = now + 60000; + JobStoryProducer jobStoryProducer = new SimulatorJobStoryProducer( + new Path(traceFile), cluster, firstJobStartTime, jobConf); + + jc = new SimulatorJobClient(jt, jobStoryProducer); + queue.addAll(jc.init(firstJobStartTime)); + + // create TTs based on topology.json + startTaskTrackers(cluster, now); + + terminateTime = getConf().getLong("mumak.terminate.time", Long.MAX_VALUE); + if (terminateTime <= 0) { + throw new IllegalArgumentException("Terminate time must be positive: " + + terminateTime); + } + } + + /** + * The main loop of the simulation. First call init() to get objects ready, + * then go into the main loop, where {@link SimulatorEvent}s are handled removed from + * the {@link SimulatorEventQueue}, and new {@link SimulatorEvent}s are created and inserted + * into the {@link SimulatorEventQueue}. + * @throws IOException + * @throws InterruptedException + */ + void run() throws IOException, InterruptedException { + init(); + + for (SimulatorEvent next = queue.get(); next != null + && next.getTimeStamp() < terminateTime && !shutdown; next = queue.get()) { + currentTime = next.getTimeStamp(); + assert(currentTime == queue.getCurrentTime()); + SimulatorEventListener listener = next.getListener(); + List response = listener.accept(next); + queue.addAll(response); + } + + summary(System.out); + } + + /** + * Run after the main loop. + * @param out stream to output information about the simulation + */ + void summary(PrintStream out) { + out.println("Done, total events processed: " + queue.getEventCount()); + } + + public static void main(String[] args) throws Exception { + int res = ToolRunner.run(new Configuration(), new SimulatorEngine(), args); + System.exit(res); + } + + @Override + public int run(String[] args) throws Exception { + parseParameters(args); + try { + run(); + return 0; + } finally { + if (jt != null) { + jt.getTaskScheduler().terminate(); + } + } + } + + void parseParameters(String[] args) { + if (args.length != 2) { + throw new IllegalArgumentException("Usage: java ... SimulatorEngine trace.json topology.json"); + } + traceFile = args[0]; + topologyFile = args[1]; + } + + /** + * Called when a job is completed. Insert a {@link JobCompleteEvent} into the + * {@link SimulatorEventQueue}. This event will be picked up by + * {@link SimulatorJobClient}, which will in turn decide whether the + * simulation is done. + * @param jobStatus final status of a job, SUCCEEDED or FAILED + * @param timestamp time stamp when the job is completed + */ + void markCompletedJob(JobStatus jobStatus, long timestamp) { + queue.add(new JobCompleteEvent(jc, timestamp, jobStatus, this)); + } + + /** + * Called by {@link SimulatorJobClient} when the simulation is completed and + * should be stopped. + */ + void shutdown() { + shutdown = true; + } + + /** + * Get the current virtual time of the on-going simulation. It is defined by + * the time stamp of the last event handled. + * @return the current virtual time + */ + long getCurrentTime() { + return currentTime; + } +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEvent.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEvent.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEvent.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +/** + * {@link SimulatorEvent} represents a specific event in Mumak. + * + * Each {@link SimulatorEvent} has an expected expiry time at which it is fired + * and an {@link SimulatorEventListener} which will handle the {@link SimulatorEvent} when + * it is fired. + */ +public abstract class SimulatorEvent { + protected final SimulatorEventListener listener; + protected final long timestamp; + protected long internalCount; + + protected SimulatorEvent(SimulatorEventListener listener, long timestamp) { + this.listener = listener; + this.timestamp = timestamp; + } + + /** + * Get the expected event expiry time. + * @return the expected event expiry time + */ + public long getTimeStamp() { + return timestamp; + } + + /** + * Get the {@link SimulatorEventListener} to handle the {@link SimulatorEvent}. + * @return the {@link SimulatorEventListener} to handle the {@link SimulatorEvent}. + */ + public SimulatorEventListener getListener() { + return listener; + } + + /** + * Get an internal counter of the {@link SimulatorEvent}. Each {@link SimulatorEvent} holds a + * counter, incremented on every event, to order multiple events that occur + * at the same time. + * @return internal counter of the {@link SimulatorEvent} + */ + long getInternalCount() { + return internalCount; + } + + /** + * Set the internal counter of the {@link SimulatorEvent}. + * @param count value to set the internal counter + */ + void setInternalCount(long count) { + this.internalCount = count; + } + + @Override + public String toString() { + return this.getClass().getName() + "[" + realToString() + "]"; + } + + /** + * Converts the list of fields and values into a human readable format; + * it does not include the class name. + * Override this if you wanted your new fields to show up in toString(). + * + * @return String containing the list of fields and their values. + */ + protected String realToString() { + return "timestamp=" + timestamp + ", listener=" + listener; + } +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEventListener.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEventListener.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEventListener.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEventListener.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.List; + +/** + * Interface for entities that handle events. + */ +public interface SimulatorEventListener { + /** + * Get the initial events to put in event queue. + * @param when time to schedule the initial events + * @return list of the initial events + */ + List init(long when) throws IOException; + + /** + * Process an event, generate more events to put in event queue. + * @param event the event to be processed + * @return list of generated events by processing this event + */ + List accept(SimulatorEvent event) throws IOException; +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEventQueue.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEventQueue.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEventQueue.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorEventQueue.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +/** + * {@link SimulatorEventQueue} maintains a priority queue of events scheduled in the + * future in virtual time. Events happen in virtual time order. The + * {@link SimulatorEventQueue} has the notion of "currentTime" which is defined as time + * stamp of the last event already handled. An event can be inserted into the + * {@link SimulatorEventQueue}, and its time stamp must be later than "currentTime". + */ +public class SimulatorEventQueue { + public static final List EMPTY_EVENTS = new ArrayList(); + private SimulatorEvent lastEvent = null; + private long eventCount = 0; + private final PriorityQueue events = new PriorityQueue(1, + new Comparator() { + @Override + public int compare(SimulatorEvent o1, SimulatorEvent o2) { + if (o1.getTimeStamp() < o2.getTimeStamp()) { + return -1; + } else if (o1.getTimeStamp() > o2.getTimeStamp()) { + return 1; + } + if (o1.getInternalCount() < o2.getInternalCount()) { + return -1; + } else if (o1.getInternalCount() > o2.getInternalCount()) { + return 1; + } + return 0; + } + }); + + /** + * Get the next earliest {@link SimulatorEvent} to be handled. This {@link SimulatorEvent} has + * the smallest time stamp among all {@link SimulatorEvent}s currently scheduled in the + * {@link SimulatorEventQueue}. + * + * @return the next {@link SimulatorEvent} to be handled. Or null if no more events. + */ + public SimulatorEvent get() { + lastEvent = events.poll(); + return lastEvent; + } + + /** + * Add a single {@link SimulatorEvent} to the {@link SimulatorEventQueue}. + * + * @param event + * the {@link SimulatorEvent} + * @return true if the event is added to the queue (to follow the same + * convention as Collection.add()). + */ + public boolean add(SimulatorEvent event) { + if (lastEvent != null && event.getTimeStamp() < lastEvent.getTimeStamp()) + throw new IllegalArgumentException("Event happens in the past: " + + event.getClass()); + + event.setInternalCount(eventCount++); + return events.add(event); + } + + /** + * Adding all {@link SimulatorEvent}s. + * + * @param events + * The container contains all the events to be added. + * @return true if the queue is changed as a result of the call (to follow the + * same convention as Collection.addAll()). + */ + public boolean addAll(Collection events) { + long lastTimeStamp = (lastEvent == null) ? Long.MIN_VALUE : lastEvent + .getTimeStamp(); + for (SimulatorEvent e : events) { + if (e.getTimeStamp() < lastTimeStamp) { + throw new IllegalArgumentException("Event happens in the past: " + + e.getClass() + "(" + e.getTimeStamp() + "<" + lastTimeStamp); + } + e.setInternalCount(eventCount++); + } + return this.events.addAll(events); + } + + /** + * Get the current time in the queue. It is defined by time stamp of the last + * event handled. + * + * @return the current time in the queue + */ + public long getCurrentTime() { + if (lastEvent != null) + return lastEvent.getTimeStamp(); + else + return 0; + } + + /** + * Get the size of currently scheduled events. Number of events in the system + * is the major scaling factor of the simulator. + * + * @return the size of currently scheduled events + */ + public int getSize() { + return events.size(); + } + + /** + * Get the total number of events handled in a simulation. This is an + * indicator of how large a particular simulation run is. + * + * @return the total number of events handled in a simulation + */ + public long getEventCount() { + return eventCount; + } +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobCache.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobCache.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobCache.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobCache.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.tools.rumen.JobStory; + +/** + * A static ({@link JobID}, {@link JobStory}) mapping, used by {@link JobClient} + * and {@link JobTracker} for job submission. + */ +public class SimulatorJobCache { + private static Map submittedJobs = new HashMap(); + + /** + * Put ({@link JobID}, {@link JobStory}) into the mapping. + * @param jobId id of the job. + * @param job {@link JobStory} object of the job. + */ + public static void put(JobID jobId, JobStory job) { + submittedJobs.put(jobId, job); + } + + /** + * Get the job identified by {@link JobID} and remove it from the mapping. + * @param jobId id of the job. + * @return {@link JobStory} object of the job. + */ + public static JobStory get(JobID jobId) { + return submittedJobs.remove(jobId); + } + + /** + * Check the job at the head of queue, without removing it from the mapping. + * @param jobId id of the job. + * @return {@link JobStory} object of the job. + */ + public static JobStory peek(JobID jobId) { + return submittedJobs.get(jobId); + } +} Added: hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java?rev=818675&view=auto ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java (added) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobClient.java Fri Sep 25 00:27:57 2009 @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.tools.rumen.JobStory; +import org.apache.hadoop.tools.rumen.JobStoryProducer; + +/** + * Class that simulates a job client. It's main functionality is to submit jobs + * to the simulation engine, and shutdown the simulation engine if the job + * producer runs out of jobs. + * TODO: Change System.out.printXX to LOG.xxx. + */ +public class SimulatorJobClient implements SimulatorEventListener { + private final ClientProtocol jobTracker; + private final JobStoryProducer jobStoryProducer; + private Set runningJobs = new LinkedHashSet(); + private boolean noMoreJobs = false; + + /** + * Constructor. + * + * @param jobTracker + * The job tracker where we submit job to. Note that the {@link + * SimulatorJobClient} interacts with the JobTracker through the + * {@link ClientProtocol}. + * @param jobStoryProducer + */ + public SimulatorJobClient(ClientProtocol jobTracker, JobStoryProducer jobStoryProducer) { + this.jobTracker = jobTracker; + this.jobStoryProducer = jobStoryProducer; + } + + @Override + public List init(long when) throws IOException { + JobStory job = jobStoryProducer.getNextJob(); + if (job.getSubmissionTime() != when) { + throw new IOException("Inconsistent submission time for the first job: " + + when + " != " + job.getSubmissionTime()+"."); + } + JobSubmissionEvent event = new JobSubmissionEvent(this, when, job); + return Collections. singletonList(event); + } + + @Override + public List accept(SimulatorEvent event) + throws IOException { + if (event instanceof JobSubmissionEvent) { + JobSubmissionEvent submitEvent = (JobSubmissionEvent)(event); + + // Submit job + JobStatus status = null; + try { + status = submitJob(submitEvent.getJob()); + } catch (InterruptedException e) { + throw new IOException(e); + } + runningJobs.add(status.getJobID()); + System.out.println("Job " + status.getJobID() + + " is submitted at " + submitEvent.getTimeStamp()); + + JobStory nextJob = jobStoryProducer.getNextJob(); + if (nextJob == null) { + noMoreJobs = true; + return SimulatorEngine.EMPTY_EVENTS; + } + + return Collections.singletonList( + new JobSubmissionEvent(this, nextJob.getSubmissionTime(), nextJob)); + } else if (event instanceof JobCompleteEvent) { + JobCompleteEvent jobCompleteEvent = (JobCompleteEvent)event; + JobStatus jobStatus = jobCompleteEvent.getJobStatus(); + System.out.println("Job " + jobStatus.getJobID() + + " completed at " + jobCompleteEvent.getTimeStamp() + + " with status: " + jobStatus.getState() + + " runtime: " + + (jobCompleteEvent.getTimeStamp() - jobStatus.getStartTime())); + runningJobs.remove(jobCompleteEvent.getJobStatus().getJobID()); + if (noMoreJobs && runningJobs.isEmpty()) { + jobCompleteEvent.getEngine().shutdown(); + } + return SimulatorEngine.EMPTY_EVENTS; + } else { + throw new IllegalArgumentException("unknown event type: " + event.getClass()); + } + } + + @SuppressWarnings("deprecation") + private JobStatus submitJob(JobStory job) + throws IOException, InterruptedException { + // honor the JobID from JobStory first. + JobID jobId = job.getJobID(); + if (jobId == null) { + // If not available, obtain JobID from JobTracker. + jobId = jobTracker.getNewJobID(); + } + + SimulatorJobCache.put(org.apache.hadoop.mapred.JobID.downgrade(jobId), job); + return jobTracker.submitJob(jobId); + } +}