Author: cdouglas
Date: Fri Jan 8 01:49:37 2010
New Revision: 897076
URL: http://svn.apache.org/viewvc?rev=897076&view=rev
Log:
MAPREDUCE-1317. Reduce the memory footprint of Rumen objects by interning
host Strings. Contributed by Hong Tang
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=897076&r1=897075&r2=897076&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jan 8 01:49:37 2010
@@ -103,6 +103,9 @@
MAPREDUCE-1295. Add a tool in Rumen for folding and manipulating job
traces. (Dick King via cdouglas)
+ MAPREDUCE-1317. Reduce the memory footprint of Rumen objects by interning
+ host Strings. (Hong Tang via cdouglas)
+
OPTIMIZATIONS
MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java?rev=897076&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java
Fri Jan 8 01:49:37 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestConcurrentRead {
+ static final List<LoggedJob> cachedTrace = new ArrayList<LoggedJob>();
+ static final String traceFile =
+ "rumen/small-trace-test/job-tracker-logs-trace-output.gz";
+
+ static Configuration conf;
+ static FileSystem lfs;
+ static Path path;
+
+ @BeforeClass
+ static public void globalSetUp() throws IOException {
+ conf = new Configuration();
+ lfs = FileSystem.getLocal(conf);
+ Path rootInputDir = new Path(System.getProperty("test.tools.input.dir", ""))
+ .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+ path = new Path(rootInputDir, traceFile);
+ JobTraceReader reader = new JobTraceReader(path, conf);
+ try {
+ LoggedJob job;
+ while ((job = reader.getNext()) != null) {
+ cachedTrace.add(job);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ void readAndCompare() throws IOException {
+ JobTraceReader reader = new JobTraceReader(path, conf);
+ try {
+ for (Iterator<LoggedJob> it = cachedTrace.iterator(); it.hasNext();) {
+ LoggedJob jobExpected = it.next();
+ LoggedJob jobRead = reader.getNext();
+ assertNotNull(jobRead);
+ try {
+ jobRead.deepCompare(jobExpected, null);
+ } catch (DeepInequalityException e) {
+ fail(e.toString());
+ }
+ }
+ assertNull(reader.getNext());
+ } finally {
+ reader.close();
+ }
+ }
+
+ class TestThread extends Thread {
+ final int repeat;
+ final CountDownLatch startSignal, doneSignal;
+ final Map<String, Throwable> errors;
+
+ TestThread(int id, int repeat, CountDownLatch startSignal, CountDownLatch doneSignal,
Map<String, Throwable> errors) {
+ super(String.format("TestThread-%d", id));
+ this.repeat = repeat;
+ this.startSignal = startSignal;
+ this.doneSignal = doneSignal;
+ this.errors = errors;
+ }
+
+ @Override
+ public void run() {
+ try {
+ startSignal.await();
+ for (int i = 0; i < repeat; ++i) {
+ try {
+ readAndCompare();
+ } catch (Throwable e) {
+ errors.put(getName(), e);
+ break;
+ }
+ }
+ doneSignal.countDown();
+ } catch (Throwable e) {
+ errors.put(getName(), e);
+ }
+ }
+ }
+
+ @Test
+ public void testConcurrentRead() throws InterruptedException {
+ int nThr = conf.getInt("test.rumen.concurrent-read.threads", 4);
+ int repeat = conf.getInt("test.rumen.concurrent-read.repeat", 10);
+ CountDownLatch startSignal = new CountDownLatch(1);
+ CountDownLatch doneSignal = new CountDownLatch(nThr);
+ Map<String, Throwable> errors = Collections
+ .synchronizedMap(new TreeMap<String, Throwable>());
+ for (int i = 0; i < nThr; ++i) {
+ new TestThread(i, repeat, startSignal, doneSignal, errors).start();
+ }
+ startSignal.countDown();
+ doneSignal.await();
+ if (!errors.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, Throwable> e : errors.entrySet()) {
+ sb.append(String.format("%s:\n%s\n", e.getKey(), e.getValue().toString()));
+ }
+ fail(sb.toString());
+ }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=897076&r1=897075&r2=897076&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
(original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
Fri Jan 8 01:49:37 2010
@@ -1285,7 +1285,7 @@
attempt.setLocation(host.makeLoggedLocation());
}
- ArrayList<LoggedLocation> locs = task.getPreferredLocations();
+ List<LoggedLocation> locs = task.getPreferredLocations();
if (host != null && locs != null) {
for (LoggedLocation loc : locs) {
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=897076&r1=897075&r2=897076&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Fri
Jan 8 01:49:37 2010
@@ -18,7 +18,10 @@
package org.apache.hadoop.tools.rumen;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@@ -41,12 +44,15 @@
*
*/
public class LoggedLocation implements DeepCompare {
+ static final Map<List<String>, List<String>> layersCache =
+ new HashMap<List<String>, List<String>>();
+
/**
* The full path from the root of the network to the host.
*
* NOTE that this assumes that the network topology is a tree.
*/
- List<String> layers = new ArrayList<String>();
+ List<String> layers = Collections.emptyList();
static private Set<String> alreadySeenAnySetterAttributes =
new TreeSet<String>();
@@ -56,7 +62,26 @@
}
void setLayers(List<String> layers) {
- this.layers = layers;
+ if (layers == null || layers.isEmpty()) {
+ this.layers = Collections.emptyList();
+ } else {
+ synchronized (layersCache) {
+ List<String> found = layersCache.get(layers);
+ if (found == null) {
+ // make a copy with interned string.
+ List<String> clone = new ArrayList<String>(layers.size());
+ for (String s : layers) {
+ clone.add(s.intern());
+ }
+ // making it read-only as we are sharing them.
+ List<String> readonlyLayers = Collections.unmodifiableList(clone);
+ layersCache.put(readonlyLayers, readonlyLayers);
+ this.layers = readonlyLayers;
+ } else {
+ this.layers = found;
+ }
+ }
+ }
}
@SuppressWarnings("unused")
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=897076&r1=897075&r2=897076&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Fri Jan
8 01:49:37 2010
@@ -18,6 +18,7 @@
package org.apache.hadoop.tools.rumen;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -44,9 +45,7 @@
Pre21JobHistoryConstants.Values taskType;
Pre21JobHistoryConstants.Values taskStatus;
List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
-
- ArrayList<LoggedLocation> preferredLocations =
- new ArrayList<LoggedLocation>();
+ List<LoggedLocation> preferredLocations = Collections.emptyList();
int numberMaps = -1;
int numberReduces = -1;
@@ -139,15 +138,23 @@
}
void setAttempts(List<LoggedTaskAttempt> attempts) {
- this.attempts = attempts;
+ if (attempts == null) {
+ this.attempts = new ArrayList<LoggedTaskAttempt>();
+ } else {
+ this.attempts = attempts;
+ }
}
- public ArrayList<LoggedLocation> getPreferredLocations() {
+ public List<LoggedLocation> getPreferredLocations() {
return preferredLocations;
}
- void setPreferredLocations(ArrayList<LoggedLocation> preferredLocations) {
- this.preferredLocations = preferredLocations;
+ void setPreferredLocations(List<LoggedLocation> preferredLocations) {
+ if (preferredLocations == null || preferredLocations.isEmpty()) {
+ this.preferredLocations = Collections.emptyList();
+ } else {
+ this.preferredLocations = preferredLocations;
+ }
}
public int getNumberMaps() {
@@ -213,8 +220,8 @@
}
}
- private void compareLoggedLocations(ArrayList<LoggedLocation> c1,
- ArrayList<LoggedLocation> c2, TreePath loc, String eltname)
+ private void compareLoggedLocations(List<LoggedLocation> c1,
+ List<LoggedLocation> c2, TreePath loc, String eltname)
throws DeepInequalityException {
if (c1 == null && c2 == null) {
return;
Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=897076&r1=897075&r2=897076&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
(original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
Fri Jan 8 01:49:37 2010
@@ -140,7 +140,7 @@
}
void setHostName(String hostName) {
- this.hostName = hostName;
+ this.hostName = hostName.intern();
}
public long getHdfsBytesRead() {
|