tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject tez git commit: TEZ-2203. Intern strings in tez counters (bikas)
Date Fri, 20 Mar 2015 19:27:09 GMT
Repository: tez
Updated Branches:
  refs/heads/master 9b845f296 -> 6e15b2f6d


TEZ-2203. Intern strings in tez counters (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6e15b2f6
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6e15b2f6
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6e15b2f6

Branch: refs/heads/master
Commit: 6e15b2f6dac8471d266b8f313038f3767fedab04
Parents: 9b845f2
Author: Bikas Saha <bikas@apache.org>
Authored: Fri Mar 20 12:27:02 2015 -0700
Committer: Bikas Saha <bikas@apache.org>
Committed: Fri Mar 20 12:27:02 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../common/counters/AbstractCounterGroup.java   |   7 +-
 .../common/counters/FileSystemCounterGroup.java |   9 +-
 .../common/counters/FrameworkCounterGroup.java  |   3 +-
 .../tez/common/counters/GenericCounter.java     |  12 +-
 .../tez/dag/app/TestMockDAGAppMaster.java       | 111 ++++++++++++++++++-
 6 files changed, 128 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 759504d..6045975 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2203. Intern strings in tez counters
   TEZ-2202. Fix LocalTaskExecutionThread ID to the standard thread numbering.
   TEZ-2059. Remove TaskEventHandler in TestDAGImpl
   TEZ-2191. Simulation improvements to MockDAGAppMaster

http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
index 7e791d7..a4b153f 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringInterner;
 
 import com.google.common.collect.Iterators;
 
@@ -49,8 +50,8 @@ public abstract class AbstractCounterGroup<T extends TezCounter>
 
   public AbstractCounterGroup(String name, String displayName,
                               Limits limits) {
-    this.name = name;
-    this.displayName = displayName;
+    this.name = StringInterner.weakIntern(name);
+    this.displayName = StringInterner.weakIntern(displayName);
     this.limits = limits;
   }
 
@@ -160,7 +161,7 @@ public abstract class AbstractCounterGroup<T extends TezCounter>
 
   @Override
   public synchronized void readFields(DataInput in) throws IOException {
-    displayName = Text.readString(in);
+    displayName = StringInterner.weakIntern(Text.readString(in));
     counters.clear();
     int size = WritableUtils.readVInt(in);
     for (int i = 0; i < size; i++) {

http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
index 07df3fb..771f523 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
@@ -29,13 +29,16 @@ import java.util.Locale;
 import java.util.Map;
 
 import com.google.common.base.Joiner;
+
 import static com.google.common.base.Preconditions.*;
+
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringInterner;
 
 /**
  * An abstract class to provide common implementation of the filesystem
@@ -54,7 +57,7 @@ public abstract class FileSystemCounterGroup<C extends TezCounter>
   // Just a few local casts probably worth not having to carry it around.
   private final Map<String, Object[]> map =
     new ConcurrentSkipListMap<String, Object[]>();
-  private String displayName = "File System Counters";
+  private String displayName = StringInterner.weakIntern("File System Counters");
 
   private static final Joiner NAME_JOINER = Joiner.on('_');
 
@@ -65,7 +68,7 @@ public abstract class FileSystemCounterGroup<C extends TezCounter>
     private long value;
 
     public FSCounter(String scheme, FileSystemCounter ref) {
-      this.scheme = scheme;
+      this.scheme = scheme; // this is interned in the checkScheme() method via a map
       key = ref;
     }
 
@@ -122,7 +125,7 @@ public abstract class FileSystemCounterGroup<C extends TezCounter>
 
   @Override
   public void setDisplayName(String displayName) {
-    this.displayName = displayName;
+    this.displayName = StringInterner.weakIntern(displayName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
b/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
index 66b6e33..3a4aa97 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
@@ -26,7 +26,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 
-import com.google.common.base.Joiner;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -60,7 +59,7 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
 
     public FrameworkCounter(T ref, String groupName) {
       key = ref;
-      this.groupName = groupName;
+      this.groupName = groupName; // this is interned in the fmap/i2s of CounterGroupFactory
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/tez-api/src/main/java/org/apache/tez/common/counters/GenericCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/GenericCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/GenericCounter.java
index 5477606..4bb4c76 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/GenericCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/GenericCounter.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringInterner;
 
 /**
  * A generic counter implementation
@@ -41,13 +42,12 @@ public class GenericCounter extends AbstractCounter {
   }
 
   public GenericCounter(String name, String displayName) {
-    this.name = name;
-    this.displayName = displayName;
+    this(name, displayName, 0);
   }
 
   public GenericCounter(String name, String displayName, long value) {
-    this.name = name;
-    this.displayName = displayName;
+    this.name = StringInterner.weakIntern(name);
+    this.displayName = StringInterner.weakIntern(displayName);
     this.value = value;
   }
 
@@ -58,8 +58,8 @@ public class GenericCounter extends AbstractCounter {
 
   @Override
   public synchronized void readFields(DataInput in) throws IOException {
-    name = Text.readString(in);
-    displayName = in.readBoolean() ? Text.readString(in) : name;
+    name = StringInterner.weakIntern(Text.readString(in));
+    displayName = in.readBoolean() ? StringInterner.weakIntern(Text.readString(in)) : name;
     value = WritableUtils.readVLong(in);
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index a7feef8..822fe7f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -18,7 +18,11 @@
 
 package org.apache.tez.dag.app;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,11 +30,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
@@ -64,6 +72,7 @@ import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import com.google.common.collect.Maps;
@@ -245,6 +254,12 @@ public class TestMockDAGAppMaster {
             Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
                 DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
                 OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+    TezCounters temp = new TezCounters();
+    temp.findCounter(new String(globalCounterName), new String(globalCounterName)).increment(1);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(bos);
+    temp.write(out);
+    final byte[] payload = bos.toByteArray();
 
     MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
     MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
@@ -254,7 +269,15 @@ public class TestMockDAGAppMaster {
       public TezCounters getCounters(TaskSpec taskSpec) {
         String vName = taskSpec.getVertexName();
         TezCounters counters = new TezCounters();
-        counters.findCounter(globalCounterName, globalCounterName).increment(1);
+        final DataInputByteBuffer in  = new DataInputByteBuffer();
+        in.reset(ByteBuffer.wrap(payload));
+        try {
+          // this ensures that the serde code path is covered.
+          // the internal merges of counters covers the constructor code path.
+          counters.readFields(in);
+        } catch (IOException e) {
+          Assert.fail(e.getMessage());
+        }
         counters.findCounter(vName, procCounterName).increment(1);
         for (OutputSpec output : taskSpec.getOutputs()) {
           counters.findCounter(vName, output.getDestinationVertexName()).increment(1);
@@ -281,7 +304,93 @@ public class TestMockDAGAppMaster {
     Assert.assertEquals(1, counters.findCounter(vBName, vAName).getValue());
     // verify global counters
     Assert.assertEquals(11, counters.findCounter(globalCounterName, globalCounterName).getValue());
+    VertexImpl vAImpl = (VertexImpl) dagImpl.getVertex(vAName);
+    VertexImpl vBImpl = (VertexImpl) dagImpl.getVertex(vBName);
+    TezCounters vACounters = vAImpl.getAllCounters();
+    TezCounters vBCounters = vBImpl.getAllCounters();
+    String vACounterName = vACounters.findCounter(globalCounterName, globalCounterName).getName();
+    String vBCounterName = vBCounters.findCounter(globalCounterName, globalCounterName).getName();
+    if (vACounterName != vBCounterName) {
+      Assert.fail("String counter name objects dont match despite interning.");
+    }
+    CounterGroup vaGroup = vACounters.getGroup(globalCounterName);
+    String vaGrouName = vaGroup.getName();
+    CounterGroup vBGroup = vBCounters.getGroup(globalCounterName);
+    String vBGrouName = vBGroup.getName();
+    if (vaGrouName != vBGrouName) {
+      Assert.fail("String group name objects dont match despite interning.");
+    }
+    
+    tezClient.stop();
+  }
+  
+  private void checkMemory(String name, MockDAGAppMaster mockApp) {                
+    long mb = 1024*1024;                                                           
+                                                                                   
+    //Getting the runtime reference from system                                    
+    Runtime runtime = Runtime.getRuntime();                                        
+                                                                                   
+    System.out.println("##### Heap utilization statistics [MB] for " + name);      
+                                                                                   
+    runtime.gc();                                                                  
+                                                                                   
+    //Print used memory                                                            
+    System.out.println("##### Used Memory:"                                        
+        + (runtime.totalMemory() - runtime.freeMemory()) / mb);                    
+                                                                                    
+    //Print free memory                                                            
+    System.out.println("##### Free Memory:"                                        
+        + runtime.freeMemory() / mb);                                              
+                                                                                   
+    //Print total available memory                                                 
+    System.out.println("##### Total Memory:" + runtime.totalMemory() / mb);        
+                                                                                   
+    //Print Maximum available memory                                               
+    System.out.println("##### Max Memory:" + runtime.maxMemory() / mb);            
+  }  
+
+  @Ignore
+  @Test (timeout = 60000)
+  public void testBasicCounterMemory() throws Exception {
+    Logger.getRootLogger().setLevel(Level.WARN);
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null,
null,
+        null, false, false);
+    tezClient.start();
+
+    final String vAName = "A";
     
+    DAG dag = DAG.create("testBasicCounters");
+    Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10000);
+    dag.addVertex(vA);
+
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    mockApp.countersDelegate = new CountersDelegate() {
+      @Override
+      public TezCounters getCounters(TaskSpec taskSpec) {
+        TezCounters counters = new TezCounters();
+        final String longName = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
+        final String shortName = "abcdefghijklmnopqrstuvwxyz";
+        for (int i=0; i<6; ++i) {
+          for (int j=0; j<15; ++j) {
+            counters.findCounter((i + longName), (i + (shortName))).increment(1);
+          }
+        }
+        return counters;
+      }
+    };
+    mockApp.doSleep = false;
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    mockLauncher.startScheduling(true);
+    DAGStatus status = dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
+    TezCounters counters = dagImpl.getAllCounters();
+    Assert.assertNotNull(counters);
+    checkMemory(dag.getName(), mockApp);
     tezClient.stop();
   }
   


Mime
View raw message