Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 93179 invoked from network); 27 Jun 2007 17:53:39 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 27 Jun 2007 17:53:39 -0000 Received: (qmail 81322 invoked by uid 500); 27 Jun 2007 17:53:42 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 81301 invoked by uid 500); 27 Jun 2007 17:53:42 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 81292 invoked by uid 99); 27 Jun 2007 17:53:42 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jun 2007 10:53:42 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jun 2007 10:53:38 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 165E61A981A; Wed, 27 Jun 2007 10:53:18 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r551245 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MapOutputLocation.java src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/TaskTracker.java Date: Wed, 27 Jun 2007 17:53:17 -0000 To: hadoop-commits@lucene.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070627175318.165E61A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Wed Jun 27 10:53:16 2007 New Revision: 551245 URL: http://svn.apache.org/viewvc?view=rev&rev=551245 Log: HADOOP-1485. Add metrics for monitoring shuffle. Contributed by Devaraj. Modified: lucene/hadoop/trunk/CHANGES.txt lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Modified: lucene/hadoop/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=551245&r1=551244&r2=551245 ============================================================================== --- lucene/hadoop/trunk/CHANGES.txt (original) +++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 27 10:53:16 2007 @@ -264,6 +264,9 @@ 80. HADOOP-1028. Add log messages for server startup and shutdown. (Tsz Wo Sze via cutting) + 81. HADOOP-1485. Add metrics for monitoring shuffle. + (Devaraj Das via cutting) + Release 0.13.0 - 2007-06-08 Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=551245&r1=551244&r2=551245 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Wed Jun 27 10:53:16 2007 @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.io.*; -import org.apache.hadoop.metrics.MetricsRecord; +import org.apache.hadoop.mapred.ReduceTask.ReduceCopier.ShuffleClientMetrics; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.conf.*; @@ -187,7 +187,7 @@ */ public Path getFile(InMemoryFileSystem inMemFileSys, FileSystem localFileSys, - MetricsRecord shuffleMetrics, + ShuffleClientMetrics shuffleMetrics, Path localFilename, LocalDirAllocator lDirAlloc, Configuration conf, int reduce, @@ -240,8 +240,7 @@ int len = input.read(buffer); while (len > 0) { totalBytes += len; - shuffleMetrics.incrMetric("shuffle_input_bytes", len); - shuffleMetrics.update(); + shuffleMetrics.inputBytes(len); output.write(buffer, 0 , len); if (currentThread.isInterrupted()) { throw new InterruptedException(); Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=551245&r1=551244&r2=551245 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jun 27 10:53:16 2007 @@ -58,6 +58,7 @@ import org.apache.hadoop.metrics.MetricsContext; import org.apache.hadoop.metrics.MetricsRecord; import org.apache.hadoop.metrics.MetricsUtil; +import org.apache.hadoop.metrics.Updater; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -342,7 +343,7 @@ done(umbilical); } - private class ReduceCopier implements MRConstants { + class ReduceCopier implements MRConstants { /** Reference to the umbilical object */ private TaskUmbilicalProtocol umbilical; @@ -431,9 +432,9 @@ private MapOutputCopier[] copiers = null; /** - * The threads for fetching the files. + * The object for metrics reporting. */ - private MetricsRecord shuffleMetrics = null; + private ShuffleClientMetrics shuffleClientMetrics = null; /** * the minimum interval between tasktracker polls @@ -464,6 +465,65 @@ */ private long ramfsMergeOutputSize; + /** + * This class contains the methods that should be used for metrics-reporting + * the specific metrics for shuffle. This class actually reports the + * metrics for the shuffle client (the ReduceTask), and hence the name + * ShuffleClientMetrics. + */ + class ShuffleClientMetrics implements Updater { + private MetricsRecord shuffleMetrics = null; + private int numFailedFetches = 0; + private int numSuccessFetches = 0; + private long numBytes = 0; + private int numThreadsBusy = 0; + ShuffleClientMetrics(JobConf conf) { + MetricsContext metricsContext = MetricsUtil.getContext("mapred"); + this.shuffleMetrics = + MetricsUtil.createRecord(metricsContext, "shuffleInput"); + this.shuffleMetrics.setTag("user", conf.getUser()); + this.shuffleMetrics.setTag("jobName", conf.getJobName()); + this.shuffleMetrics.setTag("jobId", ReduceTask.this.getJobId()); + this.shuffleMetrics.setTag("taskId", getTaskId()); + this.shuffleMetrics.setTag("sessionId", conf.getSessionId()); + metricsContext.registerUpdater(this); + } + public synchronized void inputBytes(long numBytes) { + this.numBytes += numBytes; + } + public synchronized void failedFetch() { + ++numFailedFetches; + } + public synchronized void successFetch() { + ++numSuccessFetches; + } + public synchronized void threadBusy() { + ++numThreadsBusy; + } + public synchronized void threadFree() { + --numThreadsBusy; + } + public void doUpdates(MetricsContext unused) { + synchronized (this) { + shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes); + shuffleMetrics.incrMetric("shuffle_failed_fetches", + numFailedFetches); + shuffleMetrics.incrMetric("shuffle_success_fetches", + numSuccessFetches); + if (numCopiers != 0) { + shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", + 100*((float)numThreadsBusy/numCopiers)); + } else { + shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0); + } + numBytes = 0; + numSuccessFetches = 0; + numFailedFetches = 0; + } + shuffleMetrics.update(); + } + } + /** Represents the result of an attempt to copy a map output */ private class CopyResult { @@ -567,13 +627,17 @@ } try { + shuffleClientMetrics.threadBusy(); start(loc); size = copyOutput(loc); + shuffleClientMetrics.successFetch(); } catch (IOException e) { LOG.warn(reduceTask.getTaskId() + " copy failed: " + loc.getMapTaskId() + " from " + loc.getHost()); LOG.warn(StringUtils.stringifyException(e)); + shuffleClientMetrics.failedFetch(); } finally { + shuffleClientMetrics.threadFree(); finish(size); } } catch (InterruptedException e) { @@ -607,7 +671,7 @@ // a working filename that will be unique to this attempt Path tmpFilename = new Path(filename + "-" + id); // this copies the map output file - tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics, + tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleClientMetrics, tmpFilename, lDirAlloc, conf, reduceTask.getPartition(), STALLED_COPY_TIMEOUT, reporter); @@ -712,6 +776,7 @@ throws IOException { configureClasspath(conf); + this.shuffleClientMetrics = new ShuffleClientMetrics(conf); this.umbilical = umbilical; this.reduceTask = ReduceTask.this; this.scheduledCopies = new ArrayList(100); @@ -743,11 +808,6 @@ this.lastPollTime = 0; - MetricsContext metricsContext = MetricsUtil.getContext("mapred"); - this.shuffleMetrics = - MetricsUtil.createRecord(metricsContext, "shuffleInput"); - this.shuffleMetrics.setTag("user", conf.getUser()); - this.shuffleMetrics.setTag("sessionId", conf.getSessionId()); // Seed the random number generator with a reasonably globally unique seed long randomSeed = System.nanoTime() + Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=551245&r1=551244&r2=551245 ============================================================================== --- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 27 10:53:16 2007 @@ -148,6 +148,7 @@ private int failures; private int finishedCount[] = new int[1]; private MapEventsFetcherThread mapEventsFetcher; + int workerThreads; /** * the minimum interval between jobtracker polls */ @@ -157,6 +158,60 @@ */ private int probe_sample_size = 50; + private ShuffleServerMetrics shuffleServerMetrics; + /** This class contains the methods that should be used for metrics-reporting + * the specific metrics for shuffle. The TaskTracker is actually a server for + * the shuffle and hence the name ShuffleServerMetrics. + */ + private class ShuffleServerMetrics implements Updater { + private MetricsRecord shuffleMetricsRecord = null; + private int serverHandlerBusy = 0; + private long outputBytes = 0; + private int failedOutputs = 0; + private int successOutputs = 0; + ShuffleServerMetrics(JobConf conf) { + MetricsContext context = MetricsUtil.getContext("mapred"); + shuffleMetricsRecord = + MetricsUtil.createRecord(context, "shuffleOutput"); + this.shuffleMetricsRecord.setTag("sessionId", conf.getSessionId()); + context.registerUpdater(this); + } + synchronized void serverHandlerBusy() { + ++serverHandlerBusy; + } + synchronized void serverHandlerFree() { + --serverHandlerBusy; + } + synchronized void outputBytes(long bytes) { + outputBytes += bytes; + } + synchronized void failedOutput() { + ++failedOutputs; + } + synchronized void successOutput() { + ++successOutputs; + } + public void doUpdates(MetricsContext unused) { + synchronized (this) { + if (workerThreads != 0) { + shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent", + 100*((float)serverHandlerBusy/workerThreads)); + } else { + shuffleMetricsRecord.setMetric("shuffle_handler_busy_percent", 0); + } + shuffleMetricsRecord.incrMetric("shuffle_output_bytes", + outputBytes); + shuffleMetricsRecord.incrMetric("shuffle_failed_outputs", + failedOutputs); + shuffleMetricsRecord.incrMetric("shuffle_success_outputs", + successOutputs); + outputBytes = 0; + failedOutputs = 0; + successOutputs = 0; + } + shuffleMetricsRecord.update(); + } + } private class TaskTrackerMetrics implements Updater { private MetricsRecord metricsRecord = null; private int numCompletedTasks = 0; @@ -663,7 +718,8 @@ int httpPort = conf.getInt("tasktracker.http.port", 50060); String httpBindAddress = conf.get("tasktracker.http.bindAddress", "0.0.0.0"); this.server = new StatusHttpServer("task", httpBindAddress, httpPort, true); - int workerThreads = conf.getInt("tasktracker.http.threads", 40); + workerThreads = conf.getInt("tasktracker.http.threads", 40); + this.shuffleServerMetrics = new ShuffleServerMetrics(fConf); server.setThreads(1, workerThreads); // let the jsp pages get to the task tracker, config, and other relevant // objects @@ -674,6 +730,7 @@ server.setAttribute("conf", conf); server.setAttribute("log", LOG); server.setAttribute("localDirAllocator", localDirAllocator); + server.setAttribute("shuffleServerMetrics", shuffleServerMetrics); server.addServlet("mapOutput", "/mapOutput", MapOutputServlet.class); server.start(); this.httpPort = server.getPort(); @@ -1839,27 +1896,31 @@ ServletContext context = getServletContext(); int reduce = Integer.parseInt(reduceId); byte[] buffer = new byte[MAX_BYTES_TO_READ]; - OutputStream outStream = response.getOutputStream(); - JobConf conf = (JobConf) context.getAttribute("conf"); - LocalDirAllocator lDirAlloc = - (LocalDirAllocator)context.getAttribute("localDirAllocator"); - FileSystem fileSys = - (FileSystem) context.getAttribute("local.file.system"); - - // Index file - Path indexFileName = lDirAlloc.getLocalPathToRead( - mapId+"/file.out.index", conf); - FSDataInputStream indexIn = null; - - // Map-output file - Path mapOutputFileName = lDirAlloc.getLocalPathToRead( - mapId+"/file.out", conf); - FSDataInputStream mapOutputIn = null; - // true iff IOException was caused by attempt to access input boolean isInputException = true; - + OutputStream outStream = null; + FSDataInputStream indexIn = null; + FSDataInputStream mapOutputIn = null; + + ShuffleServerMetrics shuffleMetrics = (ShuffleServerMetrics) + context.getAttribute("shuffleServerMetrics"); try { + shuffleMetrics.serverHandlerBusy(); + outStream = response.getOutputStream(); + JobConf conf = (JobConf) context.getAttribute("conf"); + LocalDirAllocator lDirAlloc = + (LocalDirAllocator)context.getAttribute("localDirAllocator"); + FileSystem fileSys = + (FileSystem) context.getAttribute("local.file.system"); + + // Index file + Path indexFileName = lDirAlloc.getLocalPathToRead( + mapId+"/file.out.index", conf); + + // Map-output file + Path mapOutputFileName = lDirAlloc.getLocalPathToRead( + mapId+"/file.out", conf); + /** * Read the index file to get the information about where * the map-output for the given reducer is available. @@ -1899,6 +1960,7 @@ ? (int)partLength : MAX_BYTES_TO_READ); while (len > 0) { try { + shuffleMetrics.outputBytes(len); outStream.write(buffer, 0, len); outStream.flush(); } catch (IOException ie) { @@ -1923,6 +1985,7 @@ tracker.mapOutputLost(mapId, errorMsg); } response.sendError(HttpServletResponse.SC_GONE, errorMsg); + shuffleMetrics.failedOutput(); throw ie; } finally { if (indexIn != null) { @@ -1931,8 +1994,10 @@ if (mapOutputIn != null) { mapOutputIn.close(); } + shuffleMetrics.serverHandlerFree(); } outStream.close(); + shuffleMetrics.successOutput(); } } }