hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zs...@apache.org
Subject svn commit: r797704 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java ql/src/test/results/clientpositive/nullgroup5.q.out
Date Sat, 25 Jul 2009 01:12:16 GMT
Author: zshao
Date: Sat Jul 25 01:12:16 2009
New Revision: 797704

URL: http://svn.apache.org/viewvc?rev=797704&view=rev
Log:
HIVE-689. Dump more memory stat periodically. (Namit Jain via zshao)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
    hadoop/hive/trunk/ql/src/test/results/clientpositive/nullgroup5.q.out

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=797704&r1=797703&r2=797704&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Sat Jul 25 01:12:16 2009
@@ -158,6 +158,8 @@
 
     HIVE-677. Operators to show number of rows forwarded. (Namit Jain via zshao)
 
+    HIVE-689. Dump more memory stat periodically. (Namit Jain via zshao)
+
   OPTIMIZATIONS
 
     HIVE-279. Predicate Pushdown support (Prasad Chakka via athusoo).

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=797704&r1=797703&r2=797704&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Sat Jul 25
01:12:16 2009
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
 import java.net.URLClassLoader;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -50,7 +52,16 @@
   public static final Log l4j = LogFactory.getLog("ExecMapper");
   private static boolean done;
 
+  // used to log memory usage periodically
+  private MemoryMXBean memoryMXBean;
+  private long numRows = 0;
+  private long nextCntr = 1;
+  
   public void configure(JobConf job) {
+    // Allocate the bean at the beginning - 
+    memoryMXBean = ManagementFactory.getMemoryMXBean();
+    l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+      
     try {
       l4j.info("conf classpath = " 
           + Arrays.asList(((URLClassLoader)job.getClassLoader()).getURLs()));
@@ -114,7 +125,9 @@
       if (fetchOperators != null) {
         try {
           mapredLocalWork localWork = mo.getConf().getMapLocalWork();
+          int fetchOpNum = 0;
           for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet())
{
+            int fetchOpRows = 0;
             String alias = entry.getKey();
             FetchOperator fetchOp = entry.getValue();
             Operator<? extends Serializable> forwardOp = localWork.getAliasToWork().get(alias);

@@ -124,9 +137,13 @@
               if (row == null) {
                 break;
               }
-
+              fetchOpRows++;
               forwardOp.process(row.o, 0);
             }
+            
+            if (l4j.isInfoEnabled()) {
+              l4j.info("fetch " + fetchOpNum++ + " processed " + fetchOpRows + " used mem:
" + memoryMXBean.getHeapMemoryUsage().getUsed());
+            }
           }
         } catch (Throwable e) {
           abort = true;
@@ -143,9 +160,18 @@
     try {
       if (mo.getDone())
         done = true;
-      else
+      else {
         // Since there is no concept of a group, we don't invoke startGroup/endGroup for
a mapper
         mo.process((Writable)value);
+        if (l4j.isInfoEnabled()) {
+          numRows++;
+          if (numRows == nextCntr) {
+            long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+            l4j.info("ExecMapper: processing " + numRows + " rows: used memory = " + used_memory);
+            nextCntr = getNextCntr(numRows);
+          }
+        }
+      }
     } catch (Throwable e) {
       abort = true;
       e.printStackTrace();
@@ -158,6 +184,15 @@
     }
   }
 
+  private long getNextCntr(long cntr) {
+    // A very simple counter to keep track of number of rows processed by the reducer. It
dumps
+    // every 1 million times, and quickly before that
+    if (cntr >= 1000000)
+      return cntr + 1000000;
+    
+    return 10 * cntr;
+  }
+
   public void close() {
     // No row was processed
     if(oc == null) {
@@ -175,7 +210,12 @@
           forwardOp.close(abort);
         }
       }
-
+      
+      if (l4j.isInfoEnabled()) {
+        long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+        l4j.info("ExecMapper: processed " + numRows + " rows: used memory = " + used_memory);
+      }
+      
       reportStats rps = new reportStats (rp);
       mo.preorderMap(rps);
       return;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=797704&r1=797703&r2=797704&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Sat Jul
25 01:12:16 2009
@@ -19,6 +19,9 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
 import java.net.URLClassLoader;
 import java.util.*;
 
@@ -55,7 +58,10 @@
 
   private static String [] fieldNames;
   public static final Log l4j = LogFactory.getLog("ExecReducer");
-
+  
+  // used to log memory usage periodically
+  private MemoryMXBean memoryMXBean;
+  
   // TODO: move to DynamicSerDe when it's ready
   private Deserializer inputKeyDeserializer;
   // Input value serde needs to be an array to support different SerDe 
@@ -73,6 +79,11 @@
     ObjectInspector[] rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
     ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
     ObjectInspector keyObjectInspector;
+
+    // Allocate the bean at the beginning - 
+    memoryMXBean = ManagementFactory.getMemoryMXBean();
+    l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+    
     try {
       l4j.info("conf classpath = " 
           + Arrays.asList(((URLClassLoader)job.getClassLoader()).getURLs()));
@@ -185,10 +196,13 @@
         row.add(valueObject[tag.get()]);
         // The tag is not used any more, we should remove it.
         row.add(tag);
-        cntr++;
-        if (cntr == nextCntr) {
-          l4j.info("ExecReducer: processing " + cntr + " rows");
-          nextCntr = getNextCntr(cntr);
+        if (l4j.isInfoEnabled()) {
+          cntr++;
+          if (cntr == nextCntr) {
+            long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+            l4j.info("ExecReducer: processing " + cntr + " rows: used memory = " + used_memory);
+            nextCntr = getNextCntr(cntr);
+          }
         }
         reducer.process(row, tag.get());
       }
@@ -226,7 +240,10 @@
         l4j.trace("End Group");
         reducer.endGroup();
       }
-      l4j.info("ExecReducer: processed " + cntr + " rows");
+      if (l4j.isInfoEnabled()) {
+        l4j.info("ExecReducer: processed " + cntr + " rows: used memory = " + memoryMXBean.getHeapMemoryUsage().getUsed());
+      }
+      
       reducer.close(abort);
       reportStats rps = new reportStats (rp);
       reducer.preorderMap(rps);

Modified: hadoop/hive/trunk/ql/src/test/results/clientpositive/nullgroup5.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientpositive/nullgroup5.q.out?rev=797704&r1=797703&r2=797704&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientpositive/nullgroup5.q.out (original)
+++ hadoop/hive/trunk/ql/src/test/results/clientpositive/nullgroup5.q.out Sat Jul 25 01:12:16
2009
@@ -37,6 +37,7 @@
                         type: string
                         expr: value
                         type: string
+                  outputColumnNames: _col0, _col1
                   Union
                     Select Operator
                       expressions:
@@ -44,6 +45,7 @@
                             type: string
                             expr: _col1
                             type: string
+                      outputColumnNames: _col0, _col1
                       File Output Operator
                         compressed: false
                         GlobalTableId: 0
@@ -65,6 +67,7 @@
                         type: string
                         expr: value
                         type: string
+                  outputColumnNames: _col0, _col1
                   Union
                     Select Operator
                       expressions:
@@ -72,6 +75,7 @@
                             type: string
                             expr: _col1
                             type: string
+                      outputColumnNames: _col0, _col1
                       File Output Operator
                         compressed: false
                         GlobalTableId: 0
@@ -91,7 +95,7 @@
   select key, value from tstparttbl2 y where y.ds='2009-04-09'
 )u
 Input: default/tstparttbl2/ds=2009-04-09
-Output: file:/data/users/njain/deploy/hive1/tools/ahive1-trunk-apache-hive/build/ql/tmp/910140009/10000
+Output: file:/data/users/zshao/tools/namit-trunk-apache-hive/build/ql/tmp/248088196/10000
 238	val_238
 86	val_86
 311	val_311



Mime
View raw message