hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r822051 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
Date Mon, 05 Oct 2009 22:15:04 GMT
Author: namit
Date: Mon Oct  5 22:15:03 2009
New Revision: 822051

URL: http://svn.apache.org/viewvc?rev=822051&view=rev
Log:
HIVE-865. Map Join memory leak
(Ning Zhang via namit)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=822051&r1=822050&r2=822051&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Oct  5 22:15:03 2009
@@ -167,6 +167,9 @@
     HIVE-859. Clean HWI build.xml
     (Edward Capriolo via namit)
 
+    HIVE-865. Map Join memory leak
+    (Ning Zhang via namit)
+
 Release 0.4.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=822051&r1=822050&r2=822051&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Mon
Oct  5 22:15:03 2009
@@ -74,6 +74,7 @@
   transient int mapJoinRowsKey;            // rows for a given key
   
   transient protected Map<Byte, HTree> mapJoinTables;
+  RecordManager  recman = null;
 
   public static class MapJoinObjectCtx {
     ObjectInspector standardOI;
@@ -118,7 +119,7 @@
   transient List<File> hTables;
   transient int      numMapRowsRead;
   transient int      heartbeatInterval;
-  
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     super.initializeOp(hconf);
@@ -150,7 +151,7 @@
           continue;
         
         Properties props = new Properties();
-        props.setProperty(RecordManagerOptions.CACHE_SIZE, 
+        props.setProperty(RecordManagerOptions.CACHE_SIZE,
           String.valueOf(HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINCACHEROWS)));
         
         Random rand = new Random();
@@ -165,7 +166,9 @@
           newDir = new File("/tmp" + rand.nextInt());
         }
         
-        RecordManager recman = RecordManagerFactory.createRecordManager(newDirName + "/"
+ pos, props );
+        // we don't need transaction since atomicity is handled at the higher level
+        props.setProperty(RecordManagerOptions.DISABLE_TRANSACTIONS, "true" );
+        recman = RecordManagerFactory.createRecordManager(newDirName + "/" + pos, props );
         HTree hashTable = HTree.createInstance(recman);
         
         mapJoinTables.put(Byte.valueOf((byte)pos), hashTable);
@@ -271,11 +274,19 @@
 
         // This may potentially increase the size of the hashmap on the mapper
         if (res.size() > mapJoinRowsKey) {
-          LOG.warn("Number of values for a given key " + keyObj + " are " + res.size());
-          LOG.warn("used memory " + Runtime.getRuntime().totalMemory());
+          if ( res.size() % 100 == 0 ) {
+            LOG.warn("Number of values for a given key " + keyObj + " are " + res.size());
+            LOG.warn("used memory " + Runtime.getRuntime().totalMemory());
+          }
         }
         
         hashTable.put(keyObj, valueObj);
+        
+        // commit every 100 rows to prevent Out-of-memory exception
+        if ( (res.size() % 100 == 0) && recman != null ) {
+          recman.commit();
+        }
+
         return;
       }
 



Mime
View raw message