hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r897076 - in /hadoop/mapreduce/trunk: ./ src/test/mapred/org/apache/hadoop/tools/rumen/ src/tools/org/apache/hadoop/tools/rumen/
Date Fri, 08 Jan 2010 01:49:47 GMT
Author: cdouglas
Date: Fri Jan  8 01:49:37 2010
New Revision: 897076

URL: http://svn.apache.org/viewvc?rev=897076&view=rev
Log:
MAPREDUCE-1317. Reduce the memory footprint of Rumen objects by interning
host Strings. Contributed by Hong Tang

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
    hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=897076&r1=897075&r2=897076&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jan  8 01:49:37 2010
@@ -103,6 +103,9 @@
     MAPREDUCE-1295. Add a tool in Rumen for folding and manipulating job
     traces. (Dick King via cdouglas)
 
+    MAPREDUCE-1317. Reduce the memory footprint of Rumen objects by interning
+    host Strings. (Hong Tang via cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java?rev=897076&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/tools/rumen/TestConcurrentRead.java
Fri Jan  8 01:49:37 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.tools.rumen;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestConcurrentRead {
+  static final List<LoggedJob> cachedTrace = new ArrayList<LoggedJob>();
+  static final String traceFile = 
+      "rumen/small-trace-test/job-tracker-logs-trace-output.gz";
+  
+  static Configuration conf;
+  static FileSystem lfs;
+  static Path path;
+  
+  @BeforeClass
+  static public void globalSetUp() throws IOException {
+    conf = new Configuration();
+    lfs = FileSystem.getLocal(conf);
+    Path rootInputDir = new Path(System.getProperty("test.tools.input.dir", ""))
+        .makeQualified(lfs.getUri(), lfs.getWorkingDirectory());
+    path = new Path(rootInputDir, traceFile);
+    JobTraceReader reader = new JobTraceReader(path, conf);
+    try {
+      LoggedJob job;
+      while ((job = reader.getNext()) != null) {
+        cachedTrace.add(job);
+      }
+    } finally {
+      reader.close();
+    }
+  }
+
+  void readAndCompare() throws IOException {
+    JobTraceReader reader = new JobTraceReader(path, conf);
+    try {
+      for (Iterator<LoggedJob> it = cachedTrace.iterator(); it.hasNext();) {
+        LoggedJob jobExpected = it.next();
+        LoggedJob jobRead = reader.getNext();
+        assertNotNull(jobRead);
+        try {
+          jobRead.deepCompare(jobExpected, null);
+        } catch (DeepInequalityException e) {
+          fail(e.toString());
+        }
+      }
+      assertNull(reader.getNext());
+    } finally {
+      reader.close();
+    }
+  }
+
+  class TestThread extends Thread {
+    final int repeat;
+    final CountDownLatch startSignal, doneSignal;
+    final Map<String, Throwable> errors;
+
+    TestThread(int id, int repeat, CountDownLatch startSignal, CountDownLatch doneSignal,
Map<String, Throwable> errors) {
+      super(String.format("TestThread-%d", id));
+      this.repeat = repeat;
+      this.startSignal = startSignal;
+      this.doneSignal = doneSignal;
+      this.errors = errors;
+    }
+
+    @Override
+    public void run() {
+      try {
+        startSignal.await();
+        for (int i = 0; i < repeat; ++i) {
+          try {
+            readAndCompare();
+          } catch (Throwable e) {
+            errors.put(getName(), e);
+            break;
+          }
+        }
+        doneSignal.countDown();
+      } catch (Throwable e) {
+        errors.put(getName(), e);
+      }
+    }
+  }
+
+  @Test
+  public void testConcurrentRead() throws InterruptedException {
+    int nThr = conf.getInt("test.rumen.concurrent-read.threads", 4);
+    int repeat = conf.getInt("test.rumen.concurrent-read.repeat", 10);
+    CountDownLatch startSignal = new CountDownLatch(1);
+    CountDownLatch doneSignal = new CountDownLatch(nThr);
+    Map<String, Throwable> errors = Collections
+        .synchronizedMap(new TreeMap<String, Throwable>());
+    for (int i = 0; i < nThr; ++i) {
+      new TestThread(i, repeat, startSignal, doneSignal, errors).start();
+    }
+    startSignal.countDown();
+    doneSignal.await();
+    if (!errors.isEmpty()) {
+      StringBuilder sb = new StringBuilder();
+      for (Map.Entry<String, Throwable> e : errors.entrySet()) {
+        sb.append(String.format("%s:\n%s\n", e.getKey(), e.getValue().toString()));
+      }
+      fail(sb.toString());
+    }
+  }
+}

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=897076&r1=897075&r2=897076&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
(original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java
Fri Jan  8 01:49:37 2010
@@ -1285,7 +1285,7 @@
           attempt.setLocation(host.makeLoggedLocation());
         }
 
-        ArrayList<LoggedLocation> locs = task.getPreferredLocations();
+        List<LoggedLocation> locs = task.getPreferredLocations();
 
         if (host != null && locs != null) {
           for (LoggedLocation loc : locs) {

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=897076&r1=897075&r2=897076&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Fri
Jan  8 01:49:37 2010
@@ -18,7 +18,10 @@
 package org.apache.hadoop.tools.rumen;
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -41,12 +44,15 @@
  * 
  */
 public class LoggedLocation implements DeepCompare {
+   static final Map<List<String>, List<String>> layersCache = 
+    new HashMap<List<String>, List<String>>();
+
   /**
    * The full path from the root of the network to the host.
    * 
    * NOTE that this assumes that the network topology is a tree.
    */
-  List<String> layers = new ArrayList<String>();
+  List<String> layers = Collections.emptyList();
 
   static private Set<String> alreadySeenAnySetterAttributes =
       new TreeSet<String>();
@@ -56,7 +62,26 @@
   }
 
   void setLayers(List<String> layers) {
-    this.layers = layers;
+    if (layers == null || layers.isEmpty()) {
+      this.layers = Collections.emptyList();
+    } else {
+      synchronized (layersCache) {
+        List<String> found = layersCache.get(layers);
+        if (found == null) {
+          // make a copy with interned string.
+          List<String> clone = new ArrayList<String>(layers.size());
+          for (String s : layers) {
+            clone.add(s.intern());
+          }
+          // making it read-only as we are sharing them.
+          List<String> readonlyLayers = Collections.unmodifiableList(clone);
+          layersCache.put(readonlyLayers, readonlyLayers);
+          this.layers = readonlyLayers;
+        } else {
+          this.layers = found;
+        }
+      }
+    }
   }
 
   @SuppressWarnings("unused")

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=897076&r1=897075&r2=897076&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Fri Jan
 8 01:49:37 2010
@@ -18,6 +18,7 @@
 package org.apache.hadoop.tools.rumen;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
@@ -44,9 +45,7 @@
   Pre21JobHistoryConstants.Values taskType;
   Pre21JobHistoryConstants.Values taskStatus;
   List<LoggedTaskAttempt> attempts = new ArrayList<LoggedTaskAttempt>();
-
-  ArrayList<LoggedLocation> preferredLocations =
-      new ArrayList<LoggedLocation>();
+  List<LoggedLocation> preferredLocations = Collections.emptyList();
 
   int numberMaps = -1;
   int numberReduces = -1;
@@ -139,15 +138,23 @@
   }
 
   void setAttempts(List<LoggedTaskAttempt> attempts) {
-    this.attempts = attempts;
+    if (attempts == null) {
+      this.attempts = new ArrayList<LoggedTaskAttempt>();
+    } else {
+      this.attempts = attempts;
+    }
   }
 
-  public ArrayList<LoggedLocation> getPreferredLocations() {
+  public List<LoggedLocation> getPreferredLocations() {
     return preferredLocations;
   }
 
-  void setPreferredLocations(ArrayList<LoggedLocation> preferredLocations) {
-    this.preferredLocations = preferredLocations;
+  void setPreferredLocations(List<LoggedLocation> preferredLocations) {
+    if (preferredLocations == null || preferredLocations.isEmpty()) {
+      this.preferredLocations = Collections.emptyList();
+    } else {
+      this.preferredLocations = preferredLocations;
+    }
   }
 
   public int getNumberMaps() {
@@ -213,8 +220,8 @@
     }
   }
 
-  private void compareLoggedLocations(ArrayList<LoggedLocation> c1,
-      ArrayList<LoggedLocation> c2, TreePath loc, String eltname)
+  private void compareLoggedLocations(List<LoggedLocation> c1,
+      List<LoggedLocation> c2, TreePath loc, String eltname)
       throws DeepInequalityException {
     if (c1 == null && c2 == null) {
       return;

Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=897076&r1=897075&r2=897076&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
(original)
+++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java
Fri Jan  8 01:49:37 2010
@@ -140,7 +140,7 @@
   }
 
   void setHostName(String hostName) {
-    this.hostName = hostName;
+    this.hostName = hostName.intern();
   }
 
   public long getHdfsBytesRead() {



Mime
View raw message