hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r889097 - in /hadoop/hive/trunk: ./ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/ ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/ ql/src/test/org/apache/hadoop/hive/ql/exec/
Date Thu, 10 Dec 2009 05:49:25 GMT
Author: namit
Date: Thu Dec 10 05:49:24 2009
New Revision: 889097

URL: http://svn.apache.org/viewvc?rev=889097&view=rev
Log:
HIVE-968. remove mapjoin files and use a MRU cache to optimize lookups
(Ning Zhang via namit)


Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DCLLItem.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashMapWrapper.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MRU.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHashMapWrapper.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerFactory.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerOptions.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerProvider.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/BaseRecordManager.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=889097&r1=889096&r2=889097&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Dec 10 05:49:24 2009
@@ -132,6 +132,9 @@
     HIVE-931. optimize group by to use sorting and grouping properties
     (He Yongqiang via namit)
 
+    HIVE-968. remove mapjoin files and use a MRU cache to optimize lookups
+    (Ning Zhang via namit)
+
   OPTIMIZATIONS
 
   BUG FIXES

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DCLLItem.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DCLLItem.java?rev=889097&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DCLLItem.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DCLLItem.java Thu Dec 10 05:49:24 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+/**
+ *  Doubly circular linked list item.
+ */
+public class DCLLItem {
+  
+  DCLLItem prev;
+  DCLLItem next;
+  
+  DCLLItem() {
+    prev = next = this;
+  }
+  
+  /**
+   * Get the next item.
+   * @return the next item.
+   */
+  public DCLLItem getNext() { 
+    return next; 
+  }
+  
+  /**
+   * Get the previous item.
+   * @return the previous item.
+   */
+  public DCLLItem getPrev() { 
+    return prev; 
+  }
+  
+  /**
+   * Set the next item as itm.
+   * @param itm the item to be set as next.
+   */
+  public void setNext(DCLLItem itm) { 
+    next = itm; 
+  }
+  
+  /**
+   * Set the previous item as itm
+   * @param itm the item to be set as previous.
+   */
+  public void setPrev(DCLLItem itm) { 
+    prev = itm; 
+  }
+  
+  /**
+   * Remove the current item from the doubly circular linked list.
+   */
+  public void remove() {
+    next.prev = this.prev;
+    prev.next = this.next;
+    this.prev = this.next = null;
+  }
+  
+  /**
+   * Add v as the previous of the current list item.
+   * @param v inserted item.
+   */
+  public void insertBefore(DCLLItem v) {
+    this.prev.next = v;
+    v.prev = this.prev;
+    v.next = this;
+    this.prev = v;
+  }
+  
+  /**
+   * Add v as the previous of the current list item.
+   * @param v inserted item.
+   */
+  public void insertAfter(DCLLItem v) {
+    this.next.prev = v;
+    v.next = this.next;
+    v.prev = this;
+    this.next = v;
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashMapWrapper.java?rev=889097&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashMapWrapper.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashMapWrapper.java Thu Dec 10 05:49:24 2009
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.Set;
+import java.util.HashSet;
+
+import org.apache.hadoop.hive.ql.util.jdbm.RecordManager;
+import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerFactory;
+import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerOptions;
+import org.apache.hadoop.hive.ql.util.jdbm.htree.HTree;
+import org.apache.hadoop.hive.ql.util.jdbm.helper.FastIterator;
+import org.apache.hadoop.hive.ql.exec.MRU;
+import org.apache.hadoop.hive.ql.exec.DCLLItem;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Simple wrapper for persistent Hashmap implementing only the put/get/remove/clear interface.
+ * The main memory hash table acts as a cache and all put/get will operate on it first. If the
+ * size of the main memory hash table exceeds a certain threshold, new elements will go into
+ * the persistent hash table.
+ */
+public class HashMapWrapper<K,V> {
+  
+  protected Log LOG = LogFactory.getLog(this.getClass().getName());
+  
+  // default threshold for using main memory based HashMap
+  private static final int THRESHOLD = 25000;
+  
+  private int threshold;             // threshold to put data into persistent hash table instead
+  private HashMap<K,MRUItem> mHash;  // main memory HashMap
+  private HTree pHash;               // persistent HashMap
+  private RecordManager recman;      // record manager required by HTree
+  private File tmpFile;              // temp file holding the persistent data from record manager.
+  private MRU<MRUItem> MRUList;      // MRU cache entry
+  
+  /**
+   * Doubly linked list of value items.
+   * Note: this is only used along with memory hash table. Persistent hash stores the value directory.
+   */
+  class MRUItem extends DCLLItem {
+    K key;
+    V value;
+    
+    MRUItem(K k, V v) {
+      key = k;
+      value = v;
+    }
+  }
+  
+  /**
+   * Constructor.
+   * @param threshold User specified threshold to store new values into persistent storage.
+   */
+  public HashMapWrapper(int threshold) {
+    this.threshold = threshold;
+    this.pHash = null;
+    this.recman = null;
+    mHash = new HashMap<K,MRUItem>();
+    MRUList = new MRU<MRUItem>();
+  }
+  
+  public HashMapWrapper () {
+    this(THRESHOLD);
+  }
+  
+  /**
+   * Get the value based on the key. We try to get it from the main memory hash table first.
+   * If it is not there we will look up the persistent hash table. This function also guarantees
+   * if any item is found given a key, it is available in main memory HashMap. So mutating the 
+   * returned value will be reflected (saved) in HashMapWrapper.
+   * @param key
+   * @return Value corresponding to the key. If the key is not found, return null.
+   */
+  public V get(K key) throws HiveException {
+    V value = null;
+    
+    // if not the MRU, searching the main memory hash table.
+    MRUItem item = mHash.get(key);
+    if ( item != null ) {
+      value = item.value;
+      MRUList.moveToHead(item);
+    } else  if ( pHash != null ) {
+      try {
+        value = (V) pHash.get(key);
+        if ( value != null ) { 
+          if ( mHash.size() < threshold ) {
+            mHash.put(key, new MRUItem(key, value));
+ 	     	    pHash.remove(key);
+          } else if ( threshold > 0 ) { // flush the LRU to disk
+            MRUItem tail = MRUList.tail(); // least recently used item
+	          pHash.put(tail.key, tail.value);
+ 	     	    pHash.remove(key);
+ 	     	    recman.commit();
+ 	     	    
+  	        // update mHash -- reuse MRUItem
+   	    	  item = mHash.remove(tail.key);
+    	      item.key = key;
+     	      item.value = value;
+      	    mHash.put(key, item);
+          
+            // update MRU -- reusing MRUItem
+      	    tail.key = key;
+	          tail.value = value;
+  	        MRUList.moveToHead(tail);
+          }
+        }
+      } catch ( Exception e ) {
+        LOG.warn(e.toString());
+        throw new HiveException(e);
+      }
+    } 
+    return value;
+  }
+  
+  /**
+   * Put the key value pair in the hash table. It will first try to 
+   * put it into the main memory hash table. If the size exceeds the
+   * threshold, it will put it into the persistent hash table.
+   * @param key
+   * @param value
+   * @throws HiveException
+   */
+  public void put(K key, V value)  throws HiveException {
+    int mm_size = mHash.size();
+    MRUItem itm = mHash.get(key);
+    
+    if (mm_size < threshold) {
+      if ( itm != null ) {
+        // re-use the MRU item -- just overwrite value, key is the same
+        itm.value = value;
+	      MRUList.moveToHead(itm);
+	      if (!mHash.get(key).value.equals(value))
+	        LOG.error("HashMapWrapper.put() reuse MRUItem inconsistency [1].");
+	      assert(mHash.get(key).value.equals(value));
+      } else {
+        // check if key already exists in pHash
+        try {
+          if ( pHash != null && pHash.get(key) != null ) {
+            // remove the old item from pHash and insert the new one
+            pHash.remove(key);
+            pHash.put(key, value);
+            recman.commit();
+         		return;
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+          throw new HiveException(e);
+        }
+        itm = new MRUItem(key,value);
+        MRUList.put(itm);
+	      mHash.put(key, itm);
+      }
+    } else {
+      if ( itm != null ) { // replace existing item
+        // re-use the MRU item -- just overwrite value, key is the same
+        itm.value = value;
+	      MRUList.moveToHead(itm);
+	      if (!mHash.get(key).value.equals(value))
+	        LOG.error("HashMapWrapper.put() reuse MRUItem inconsistency [2].");
+	      assert(mHash.get(key).value.equals(value));
+      } else {
+        // for items inserted into persistent hash table, we don't put it into MRU
+        try {
+          if (pHash == null) {
+            // Create a temporary file for the page manager to hold persistent data. 
+            // Delete it if the JVM terminate normally. 
+            // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
+            if ( tmpFile != null )
+         	  tmpFile.delete();
+            tmpFile = File.createTempFile("HashMapWrapper", ".tmp");
+	          tmpFile.deleteOnExit(); 
+	        
+  	        Properties props = new Properties();
+    	      props.setProperty(RecordManagerOptions.CACHE_TYPE, RecordManagerOptions.NO_CACHE);
+  	        props.setProperty(RecordManagerOptions.DISABLE_TRANSACTIONS, "true" );
+          
+    	      recman = RecordManagerFactory.createRecordManager(tmpFile, props );
+        	  pHash = HTree.createInstance(recman);
+          }
+          pHash.put(key, value);
+          recman.commit();
+        } catch (Exception e) {
+          LOG.warn(e.toString());
+          throw new HiveException(e);
+        } 
+      }
+    }
+  }
+  
+  /**
+   * Clean up the hash table. All elements in the main memory hash table will be removed, and
+   * the persistent hash table will be destroyed (temporary file will be deleted).
+   */
+  public void clear() throws HiveException {
+    if ( mHash != null ) {
+      mHash.clear();
+      MRUList.clear();
+    }
+    close();
+  }
+  
+  /**
+   * Remove one key-value pairs from the hash table based on the given key. If the pairs are
+   * removed from the main memory hash table, pairs in the persistent hash table will not be
+   * moved to the main memory hash table. Future inserted elements will go into the main memory
+   * hash table though.
+   * @param key
+   * @throws HiveException
+   */
+  public void remove(Object key) throws HiveException {
+    MRUItem entry = mHash.remove(key);
+    if ( entry != null ) {
+      MRUList.remove(entry);
+    } else if ( pHash != null ) {
+      try {
+        pHash.remove(key);
+      } catch (Exception e) {
+        LOG.warn(e.toString());
+        throw new HiveException(e);
+      }
+    }
+  }
+  
+  /**
+   * There will be no more put to the hash map before it is destroyed or cleared. 
+   * This is used to optimize MRU list maintenance cost.
+   */
+  public void noMorePut() {
+    if ( pHash == null ) { // all data in main memory, no need MRU
+    }
+  }
+  
+  /**
+   * Get a list of all keys in the hash map.
+   * @return
+   */
+  public Set<K> keySet() {
+    HashSet<K> ret = null;
+    if ( mHash != null ) {
+      ret = new HashSet<K>();
+      ret.addAll(mHash.keySet());
+    }
+    if ( pHash != null ) {
+      try {
+        FastIterator fitr = pHash.keys();
+	      if ( fitr != null ) {
+ 	        K k;
+  	      while ( (k = (K) fitr.next()) != null )
+   	        ret.add(k);
+    	  }
+   	  } catch (Exception e) {
+      e.printStackTrace();
+   	  }
+    }
+    return ret;
+  }
+  
+  /**
+   * Get the main memory cache capacity. 
+   * @return the maximum number of items can be put into main memory HashMap cache.
+   */
+  public int cacheSize() {
+    return threshold;
+  }
+  
+  /**
+   * Close the persistent hash table and clean it up.
+   * @throws HiveException
+   */
+  public void close() throws HiveException {
+    
+    if ( pHash != null ) {
+      try {
+        if ( recman != null )
+          recman.close();
+      }  catch (Exception e) {
+        throw new HiveException(e);
+      }
+      // delete the temporary file
+      tmpFile.delete();
+      tmpFile = null;
+      pHash   = null;
+      recman  = null;
+    }
+  }
+}

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MRU.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MRU.java?rev=889097&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MRU.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MRU.java Thu Dec 10 05:49:24 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.hive.ql.exec.DCLLItem;
+
+/**
+ *  An MRU (Most Recently Used) cache implementation.
+ *  This implementation maintains a doubly circular linked list and it can be used
+ *  with an auxiliary data structure such as a HashMap to locate the item quickly.
+ */
+public class MRU<T extends DCLLItem> {
+
+  T head;   // head of the linked list -- MRU; tail (head.prev) will be the LRU
+  
+  public MRU() {
+    head = null;
+  }
+  
+  /**
+   * Insert a value into the MRU. It will appear as the head.
+   */
+  public T put(T item) {
+    addToHead(item);
+    return item;
+  }
+  
+  /**
+   * Remove a item from the MRU list.
+   * @param v linked list item.
+     */
+  public void remove(T v) {
+    if (v == null) 
+      return;
+    if ( v == head ) {
+      if ( head != head.getNext()) {
+        head = (T) head.getNext();
+      } else {
+        head = null;
+      }
+    }
+    v.remove();
+  }
+  
+  /**
+   * Get the most recently used.
+   * @return the most recently used item.
+   */
+  public T head() { 
+    return head;
+  }
+  
+  /**
+   * Get the least recently used.
+   * @return the least recently used item.
+   */
+  public T tail() {
+    return (T) head.getPrev();
+  }
+  
+  /**
+   * Insert a new item as the head
+   * @param v the new linked list item to be added to the head.
+   */
+  private void addToHead(T v) {
+    if ( head == null ) {
+      head = v;
+    } else  {
+      head.insertBefore(v);
+      head = v;
+    }
+  }
+  
+  
+  /**
+   * Move an existing item to the head. 
+   * @param v the linked list item to be moved to the head.
+   */
+  public void moveToHead(T v) {
+    assert(head != null);
+    if ( head != v ) {
+      v.remove();
+      head.insertBefore(v);
+      head = v;
+    }
+  }
+  
+  /**
+   * Clear all elements in the MRU list.
+   * This is not very efficient (linear) since it will call remove() to every item in the list.
+   */
+  public void clear() {
+    while ( head.getNext() != head ) {
+      head.getNext().remove();
+    }
+    head.remove();
+    head = null;
+  }
+}

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=889097&r1=889096&r2=889097&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Thu Dec 10 05:49:24 2009
@@ -25,8 +25,6 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,10 +34,6 @@
 import org.apache.hadoop.hive.ql.plan.mapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.util.jdbm.RecordManager;
-import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerFactory;
-import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerOptions;
-import org.apache.hadoop.hive.ql.util.jdbm.htree.HTree;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -49,6 +43,7 @@
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.hive.ql.exec.HashMapWrapper;
 
 /**
  * Map side Join operator implementation.
@@ -73,9 +68,8 @@
   transient private int posBigTable;       // one of the tables that is not in memory
   transient int mapJoinRowsKey;            // rows for a given key
 
-  transient protected Map<Byte, HTree> mapJoinTables;
-  RecordManager  recman = null;
-
+  transient protected Map<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>> mapJoinTables;
+  
   public static class MapJoinObjectCtx {
     ObjectInspector standardOI;
     SerDe      serde;
@@ -126,90 +120,69 @@
     numMapRowsRead = 0;
 
     firstRow = true;
-    try {
-      heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT);
-
-      joinKeys  = new HashMap<Byte, List<ExprNodeEvaluator>>();
-
-      populateJoinKeyValue(joinKeys, conf.getKeys());
-      joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys, inputObjInspectors);
-      joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors);
-
-      // all other tables are small, and are cached in the hash table
-      posBigTable = conf.getPosBigTable();
-
-      metadataValueTag = new int[numAliases];
-      for (int pos = 0; pos < numAliases; pos++)
-        metadataValueTag[pos] = -1;
-
-      mapJoinTables = new HashMap<Byte, HTree>();
-      hTables = new ArrayList<File>();
-
-      // initialize the hash tables for other tables
-      for (int pos = 0; pos < numAliases; pos++) {
-        if (pos == posBigTable)
-          continue;
-
-        Properties props = new Properties();
-        props.setProperty(RecordManagerOptions.CACHE_SIZE,
-          String.valueOf(HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINCACHEROWS)));
-
-        Random rand = new Random();
-        File newDir = new File("/tmp/" + rand.nextInt());
-        String newDirName = null;
-        while (true) {
-          if (newDir.mkdir()) {
-            newDirName = newDir.getAbsolutePath();
-            hTables.add(newDir);
-            break;
-          }
-          newDir = new File("/tmp" + rand.nextInt());
-        }
-
-        // we don't need transaction since atomicity is handled at the higher level
-        props.setProperty(RecordManagerOptions.DISABLE_TRANSACTIONS, "true" );
-        recman = RecordManagerFactory.createRecordManager(newDirName + "/" + pos, props );
-        HTree hashTable = HTree.createInstance(recman);
-
-        mapJoinTables.put(Byte.valueOf((byte)pos), hashTable);
-      }
-
-      storage.put((byte)posBigTable, new ArrayList<ArrayList<Object>>());
-
-      mapJoinRowsKey = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
-
-      List<? extends StructField> structFields = ((StructObjectInspector)outputObjInspector).getAllStructFieldRefs();
-      if (conf.getOutputColumnNames().size() < structFields.size()) {
-        List<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
-        for (Byte alias : order) {
-          int sz = conf.getExprs().get(alias).size();
-          List<Integer> retained = conf.getRetainList().get(alias);
-          for (int i = 0; i < sz; i++) {
-            int pos = retained.get(i);
-            structFieldObjectInspectors.add(structFields.get(pos)
-                .getFieldObjectInspector());
-          }
+    heartbeatInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVESENDHEARTBEAT);
+    
+    joinKeys  = new HashMap<Byte, List<ExprNodeEvaluator>>();
+    
+    populateJoinKeyValue(joinKeys, conf.getKeys());
+    joinKeysObjectInspectors = getObjectInspectorsFromEvaluators(joinKeys, inputObjInspectors);
+    joinKeysStandardObjectInspectors = getStandardObjectInspectors(joinKeysObjectInspectors);
+    
+    // all other tables are small, and are cached in the hash table
+    posBigTable = conf.getPosBigTable();
+    
+    metadataValueTag = new int[numAliases];
+    for (int pos = 0; pos < numAliases; pos++)
+      metadataValueTag[pos] = -1;
+    
+    mapJoinTables = new HashMap<Byte, HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>>();
+    hTables = new ArrayList<File>();
+    
+    // initialize the hash tables for other tables
+    for (int pos = 0; pos < numAliases; pos++) {
+      if (pos == posBigTable)
+        continue;
+      
+      int cacheSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINCACHEROWS);
+      HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable = 
+        new HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue>(cacheSize);
+      
+      mapJoinTables.put(Byte.valueOf((byte)pos), hashTable);
+    }
+    
+    storage.put((byte)posBigTable, new ArrayList<ArrayList<Object>>());
+    
+    mapJoinRowsKey = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEMAPJOINROWSIZE);
+    
+    List<? extends StructField> structFields = ((StructObjectInspector)outputObjInspector).getAllStructFieldRefs();
+    if (conf.getOutputColumnNames().size() < structFields.size()) {
+      List<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>();
+      for (Byte alias : order) {
+        int sz = conf.getExprs().get(alias).size();
+        List<Integer> retained = conf.getRetainList().get(alias);
+        for (int i = 0; i < sz; i++) {
+          int pos = retained.get(i);
+          structFieldObjectInspectors.add(structFields.get(pos)
+                                          .getFieldObjectInspector());
         }
-        outputObjInspector = ObjectInspectorFactory
-            .getStandardStructObjectInspector(conf.getOutputColumnNames(),
-                structFieldObjectInspectors);
       }
-      initializeChildren(hconf);
-    } catch (IOException e) {
-      throw new HiveException(e);
+      outputObjInspector = ObjectInspectorFactory
+        .getStandardStructObjectInspector(conf.getOutputColumnNames(),
+                                          structFieldObjectInspectors);
     }
+    initializeChildren(hconf);
   }
-
+  
   @Override
   public void processOp(Object row, int tag) throws HiveException {
     try {
-
+      
       // get alias
       alias = (byte)tag;
-
+      
       if ((lastAlias == null) || (!lastAlias.equals(alias)))
         nextSz = joinEmitInterval;
-
+      
       // compute keys and values as StandardObjects
       ArrayList<Object> key   = computeValues(row, joinKeys.get(alias), joinKeysObjectInspectors.get(alias));
       ArrayList<Object> value = computeValues(row, joinValues.get(alias), joinValuesObjectInspectors.get(alias));
@@ -237,56 +210,56 @@
         if (((numMapRowsRead % heartbeatInterval) == 0) && (reporter != null))
           reporter.progress();
 
-        HTree hashTable = mapJoinTables.get(alias);
+        HashMapWrapper<MapJoinObjectKey, MapJoinObjectValue> hashTable =  mapJoinTables.get(alias);
         MapJoinObjectKey keyMap = new MapJoinObjectKey(metadataKeyTag, key);
-        MapJoinObjectValue o = (MapJoinObjectValue)hashTable.get(keyMap);
+        MapJoinObjectValue o = hashTable.get(keyMap);
         ArrayList<ArrayList<Object>> res = null;
 
+        boolean needNewKey = true;
         if (o == null) {
           res = new ArrayList<ArrayList<Object>>();
-        }
-        else {
+        	res.add(value);
+        } else {
           res = o.getObj();
+          res.add(value);
+          // If key already exists, HashMapWrapper.get() guarantees it is already in main memory HashMap
+          // cache. So just replacing the object value should update the HashMapWrapper. This will save
+          // the cost of constructing the new key/object and deleting old one and inserting the new one.
+          if ( hashTable.cacheSize() > 0) {
+            o.setObj(res);
+            needNewKey = false;
+          } 
         }
-
-        res.add(value);
-
+        
+        
         if (metadataValueTag[tag] == -1) {
           metadataValueTag[tag] = nextVal++;
-
+          
           tableDesc valueTableDesc = conf.getValueTblDescs().get(tag);
           SerDe valueSerDe = (SerDe)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
           valueSerDe.initialize(null, valueTableDesc.getProperties());
-
+          
           mapMetadata.put(Integer.valueOf(metadataValueTag[tag]),
-              new MapJoinObjectCtx(
-                  ObjectInspectorUtils.getStandardObjectInspector(valueSerDe.getObjectInspector(),
-                      ObjectInspectorCopyOption.WRITABLE),
-              valueSerDe));
+                          new MapJoinObjectCtx(
+                                ObjectInspectorUtils.getStandardObjectInspector(valueSerDe.getObjectInspector(),
+                                  ObjectInspectorCopyOption.WRITABLE),
+                                valueSerDe));
         }
-
+        
         // Construct externalizable objects for key and value
-        MapJoinObjectKey keyObj = new MapJoinObjectKey(metadataKeyTag, key);
-        MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
-
-        if (res.size() > 1)
-          hashTable.remove(keyObj);
-
-        // This may potentially increase the size of the hashmap on the mapper
-        if (res.size() > mapJoinRowsKey) {
-          if ( res.size() % 100 == 0 ) {
-            LOG.warn("Number of values for a given key " + keyObj + " are " + res.size());
-            LOG.warn("used memory " + Runtime.getRuntime().totalMemory());
+        if ( needNewKey ) {
+          MapJoinObjectKey keyObj = new MapJoinObjectKey(metadataKeyTag, key);
+	        MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
+          
+          // This may potentially increase the size of the hashmap on the mapper
+  	      if (res.size() > mapJoinRowsKey) {
+            if ( res.size() % 100 == 0 ) {
+    	        LOG.warn("Number of values for a given key " + keyObj + " are " + res.size());
+              LOG.warn("used memory " + Runtime.getRuntime().totalMemory());
+      	    }
           }
+          hashTable.put(keyObj, valueObj);
         }
-
-        hashTable.put(keyObj, valueObj);
-
-        // commit every 100 rows to prevent Out-of-memory exception
-        if ( (res.size() % 100 == 0) && recman != null ) {
-          recman.commit();
-        }
-
         return;
       }
 
@@ -320,12 +293,14 @@
     } catch (SerDeException e) {
       e.printStackTrace();
       throw new HiveException(e);
-    } catch (IOException e) {
-      e.printStackTrace();
-      throw new HiveException(e);
     }
   }
-
+  
+  public void closeOp(boolean abort) throws HiveException {
+    for (HashMapWrapper hashTable: mapJoinTables.values()) {
+      hashTable.close();
+    }
+  }
   /**
    * Implements the getName function for the Node Interface.
    * @return the name of the operator
@@ -334,23 +309,6 @@
     return "MAPJOIN";
   }
 
-  public void closeOp(boolean abort) throws HiveException {
-    for (File hTbl : hTables) {
-      deleteDir(hTbl);
-    }
-  }
-
-  private void deleteDir(File dir) {
-    if (dir.isDirectory()) {
-      String[] children = dir.list();
-      for (int i = 0; i < children.length; i++) {
-        deleteDir(new File(dir, children[i]));
-      }
-    }
-
-    dir.delete();
-  }
-
   public int getType() {
     return OperatorType.MAPJOIN;
   }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerFactory.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerFactory.java?rev=889097&r1=889096&r2=889097&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerFactory.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerFactory.java Thu Dec 10 05:49:24 2009
@@ -67,6 +67,7 @@
 package org.apache.hadoop.hive.ql.util.jdbm;
 
 import java.io.IOException;
+import java.io.File;
 import java.util.Properties;
 
 /**
@@ -112,6 +113,18 @@
                                                      Properties options )
         throws IOException
     {
+      RecordManagerProvider factory = getFactory(options);
+      return factory.createRecordManager( name, options );
+    }
+    
+    public static RecordManager createRecordManager( File file, Properties options)
+        throws IOException
+    {
+      RecordManagerProvider factory = getFactory(options);
+      return factory.createRecordManager( file, options );
+    }
+      
+    private static RecordManagerProvider getFactory(Properties options)  {
         String                 provider;
         Class                  clazz;
         RecordManagerProvider  factory;
@@ -129,7 +142,6 @@
                                                 + ": " + except.getMessage()
                                                 + "]" );
         }
-        return factory.createRecordManager( name, options );
+        return factory;
     }
-
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerOptions.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerOptions.java?rev=889097&r1=889096&r2=889097&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerOptions.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerOptions.java Thu Dec 10 05:49:24 2009
@@ -137,5 +137,10 @@
      * finalizable, finalized, and then reclaimed.
      */
     public static final String WEAK_REF_CACHE = "weak";
+    
+    /**
+     * Disable cache.
+     */
+    public static final String NO_CACHE = "nocache";
 
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerProvider.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerProvider.java?rev=889097&r1=889096&r2=889097&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerProvider.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/RecordManagerProvider.java Thu Dec 10 05:49:24 2009
@@ -67,6 +67,7 @@
 package org.apache.hadoop.hive.ql.util.jdbm;
 
 import java.io.IOException;
+import java.io.File;
 import java.util.Properties;
 
 /**
@@ -93,4 +94,8 @@
     public RecordManager createRecordManager( String filename,
                                               Properties options )
         throws IOException;
+    
+    public RecordManager createRecordManager( File file,
+                                              Properties options )
+        throws IOException;
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/BaseRecordManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/BaseRecordManager.java?rev=889097&r1=889096&r2=889097&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/BaseRecordManager.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/BaseRecordManager.java Thu Dec 10 05:49:24 2009
@@ -67,6 +67,7 @@
 package org.apache.hadoop.hive.ql.util.jdbm.recman;
 
 import java.io.IOException;
+import java.io.File;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -160,6 +161,23 @@
         _physMgr = new PhysicalRowIdManager( _file, _pageman );
         _logMgr = new LogicalRowIdManager( _file, _pageman );
     }
+    
+     /**
+     *  Creates a record manager for the indicated file
+     *
+     *  @throws IOException when the file cannot be opened or is not
+     *          a valid file content-wise.
+     */
+    public BaseRecordManager( File file )
+        throws IOException
+    {
+        _file = new RecordFile( file );
+        _pageman = new PageManager( _file );
+        _physMgr = new PhysicalRowIdManager( _file, _pageman );
+        _logMgr = new LogicalRowIdManager( _file, _pageman );
+    }
+    
+    
 
 
     /**

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java?rev=889097&r1=889096&r2=889097&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/Provider.java Thu Dec 10 05:49:24 2009
@@ -67,6 +67,7 @@
 package org.apache.hadoop.hive.ql.util.jdbm.recman;
 
 import java.io.IOException;
+import java.io.File;
 import java.util.Properties;
 
 import org.apache.hadoop.hive.ql.util.jdbm.RecordManager;
@@ -101,11 +102,17 @@
         throws IOException
     {
         RecordManager  recman;
-        String         value;
-        int            cacheSize;
 
         recman = new BaseRecordManager( name );
-
+        recman = getCachedRecordManager(recman, options);
+        return recman;
+    }
+    
+    private RecordManager getCachedRecordManager(RecordManager recman, Properties options)
+    {
+        String         value;
+        int            cacheSize;
+        
         value = options.getProperty( RecordManagerOptions.DISABLE_TRANSACTIONS, "false" );
         if ( value.equalsIgnoreCase( "TRUE" ) ) {
             ( (BaseRecordManager) recman ).disableTransactions();
@@ -123,12 +130,22 @@
             throw new IllegalArgumentException( "Soft reference cache not implemented" );
         } else if ( value.equalsIgnoreCase( RecordManagerOptions.WEAK_REF_CACHE ) ) {
             throw new IllegalArgumentException( "Weak reference cache not implemented" );
+        } else if ( value.equalsIgnoreCase(RecordManagerOptions.NO_CACHE) ){
+          // do nothing
         } else {
             throw new IllegalArgumentException( "Invalid cache type: " + value );
-        }
+        } 
 
         return recman;
     }
 
+    public RecordManager createRecordManager ( File file, 
+                                              Properties options )
+        throws IOException
+    {
+      RecordManager recman = new BaseRecordManager(file);
+      recman = getCachedRecordManager(recman, options);
+      return recman;
+    }
 
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java?rev=889097&r1=889096&r2=889097&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/RecordFile.java Thu Dec 10 05:49:24 2009
@@ -116,6 +116,21 @@
         file = new RandomAccessFile(fileName + extension, "rw");
         txnMgr = new TransactionManager(this);
     }
+    
+    /**
+     *  Creates a new object on the indicated filename. The file is
+     *  opened in read/write mode.
+     *
+     *  @param fileName the name of the file to open or create, without
+     *         an extension.
+     *  @throws IOException whenever the creation of the underlying
+     *          RandomAccessFile throws it.
+     */
+    RecordFile(File file) throws IOException {
+        this.fileName = file.getName();
+        this.file = new RandomAccessFile(file, "rw");
+        txnMgr = new TransactionManager(this);
+    }
 
     /**
      *  Returns the file name.
@@ -308,6 +323,9 @@
             commit();
         }
         txnMgr.shutdown();
+        if ( transactionsDisabled ) {
+          txnMgr.removeLogFile();
+        }
 
         if (!inTxn.isEmpty()) {
             showList(inTxn.values().iterator());

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java?rev=889097&r1=889096&r2=889097&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/jdbm/recman/TransactionManager.java Thu Dec 10 05:49:24 2009
@@ -111,6 +111,9 @@
 
     /** Extension of a log file. */
     static final String extension = ".lg";
+    
+    /** log file name */
+    private String logFileName;
 
     /**
      *  Instantiates a transaction manager instance. If recovery
@@ -120,6 +123,7 @@
      */
     TransactionManager(RecordFile owner) throws IOException {
         this.owner = owner;
+        logFileName = null;
         recover();
         open();
     }
@@ -206,7 +210,8 @@
 
     /** Opens the log file */
     private void open() throws IOException {
-        fos = new FileOutputStream(makeLogName());
+        logFileName = makeLogName(); 
+        fos = new FileOutputStream(logFileName);
         oos = new ObjectOutputStream(fos);
         oos.writeShort(Magic.LOGFILE_HEADER);
         oos.flush();
@@ -362,6 +367,17 @@
         oos = null;
         fos = null;
     }
+    
+    public void removeLogFile() {
+      // if file is not closed yet, just return
+      if ( oos != null ) 
+        return;
+      if ( logFileName != null ) {
+        File file = new File(logFileName) ;
+      	file.delete();
+      	logFileName = null; 
+      }
+    }
 
     /**
      * Force closing the file without synchronizing pending transaction data.

Added: hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHashMapWrapper.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHashMapWrapper.java?rev=889097&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHashMapWrapper.java (added)
+++ hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestHashMapWrapper.java Thu Dec 10 05:49:24 2009
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import junit.framework.TestCase;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.exec.HashMapWrapper;
+
+public class TestHashMapWrapper extends TestCase {
+
+  public void testHashMapWrapper() throws Exception {
+
+    HashMap<String, String> mem_map = new HashMap<String, String>();
+    mem_map.put("k1", "v1");
+    mem_map.put("k2", "v2");
+    mem_map.put("k3", "v3");
+    mem_map.put("k4", "v4");
+    
+    try {
+      // NO cache
+      HashMapWrapper<String, String> wrapper = 
+     	  new HashMapWrapper<String, String>(0);
+      insertAll(wrapper, mem_map);
+      checkAll(wrapper, mem_map);
+      wrapper.close();  // clean up temporary files
+      
+      // cache size = 1
+      wrapper = new HashMapWrapper<String, String>(1);
+      insertAll(wrapper, mem_map);
+      checkAll(wrapper, mem_map);
+      wrapper.close();  // clean up temporary files
+      
+      // cache size = 2
+      wrapper = new HashMapWrapper<String, String>(2);
+      insertAll(wrapper, mem_map);
+      checkAll(wrapper, mem_map);
+      wrapper.close();  // clean up temporary files
+      
+      // cache size = 4
+      wrapper = new HashMapWrapper<String, String>(4);
+      insertAll(wrapper, mem_map);
+      checkAll(wrapper, mem_map);
+      wrapper.close();  // clean up temporary files
+      
+      // default cache size (25000)
+      wrapper = new HashMapWrapper<String, String>();
+      insertAll(wrapper, mem_map);
+      checkAll(wrapper, mem_map);
+      wrapper.close();  // clean up temporary files
+      
+      // check mixed put/remove/get functions
+      wrapper = new HashMapWrapper<String, String>(2);
+      insertAll(wrapper, mem_map);
+      wrapper.remove("k3"); // k3 is in HTree
+      mem_map.remove("k3");
+      assertTrue(mem_map.size() == 3);
+      checkAll(wrapper, mem_map);
+      
+      wrapper.remove("k1");
+      mem_map.remove("k1");
+      checkAll(wrapper, mem_map);
+      
+      String v4 = wrapper.get("k4");
+      assertTrue(v4 != null);
+      assert(v4.equals("v4"));
+      
+      wrapper.remove("k4");
+      mem_map.remove("k4");
+      checkAll(wrapper, mem_map);
+      
+      wrapper.put("k5", "v5"); 
+      mem_map.put("k5", "v5"); 
+      checkAll(wrapper, mem_map);
+      
+      wrapper.put("k6", "v6"); 
+      mem_map.put("k6", "v6"); 
+      checkAll(wrapper, mem_map);
+      
+      wrapper.put("k6", "v61"); 
+      mem_map.put("k6", "v61"); 
+      checkAll(wrapper, mem_map);
+      
+      wrapper.remove("k6");
+      mem_map.remove("k6");
+      checkAll(wrapper, mem_map);
+      
+      // get k1, k2 to main memory
+      wrapper.get("k1");
+      wrapper.get("k2");
+      // delete k1 so that cache is half empty
+      wrapper.remove("k1");
+      mem_map.remove("k1");
+      // put new pair (k6, v7) so that it will be in persistent hash
+      wrapper.put("k6", "v7");
+      mem_map.put("k6", "v7");
+      checkAll(wrapper, mem_map);
+      
+      // test clear
+      wrapper.clear();
+      mem_map.clear();
+      checkAll(wrapper, mem_map);
+      wrapper.close();  // clean up temporary files
+      
+      // insert 3,000 pairs random testing
+      wrapper = new HashMapWrapper<String, String>(1000);
+      for ( int i = 0; i < 3000; ++i ) {
+        String k = "k" + i;
+        String v = "v" + i;
+        wrapper.put(k, v);
+        mem_map.put(k, v);
+      }
+      checkAll(wrapper, mem_map);
+      System.out.println("Finished inserting 3000 pairs.");
+      
+      // do 10,000 random get/remove operations
+      Random rand = new Random(12345678);
+      for ( int i = 0; i < 10000; ++i ) {
+        int j = rand.nextInt(3000);
+        String k = "k" + j;
+        String v;
+        
+        int command = rand.nextInt(3);
+        switch (command) {
+        case 0: // remove
+          // System.out.println("removing " + k);// uncomment this for debugging
+          wrapper.remove(k);
+          mem_map.remove(k);
+          break;
+        case 1: // get
+          // System.out.println("getting " + k);// uncomment this for debugging
+          v =  wrapper.get(k);
+          String v2 = mem_map.get(k);
+          assertTrue("one of them doesn't exists or different values from two hash tables", 
+                     v == null && v2 == null || v.equals(v2));
+          break;
+        case 2: // put
+          v = "v" + rand.nextInt(3000);
+          // System.out.println("putting (" + k + ", " + v);// uncomment this for debugging
+          wrapper.put(k, v);
+          mem_map.put(k, v);
+          break;
+        }
+        // checkAll(wrapper, mem_map); // uncomment this for debugging
+      }
+      checkAll(wrapper, mem_map);
+      wrapper.close();  // clean up temporary files
+    } catch (Exception e) {
+      e.printStackTrace();
+      System.out.println(e.toString());
+      assertTrue("Exception should not be thrown.", false);
+    }
+    System.out.println("TestHashMapWrapper successful");
+  }
+  
+  private void insertAll(HashMapWrapper<String, String> hashTable, 
+                         HashMap<String, String> map) 
+    throws HiveException {
+    
+    for (String k: map.keySet()) {
+      String v = map.get(k);
+      hashTable.put(k, v);
+    }
+  }
+  
+  private void checkAll(HashMapWrapper<String, String> hashTable, 
+                        HashMap<String, String> map) 
+    throws HiveException {
+    
+    // check each item in the HashMapWrapper was actually inserted
+    for ( String k: hashTable.keySet() ) {
+      String map_val = hashTable.get(k);
+      String val = map.get(k);
+      assertTrue("some HashMapWrapper value is not in main memory HashMap: map_val = " + map_val + "; val = " + val, 
+                 map_val != null && val != null);
+      assertTrue("value in HashMapWrapper is not the same as MM HashMap: map_val = " + map_val + "; val = " + val, 
+                 val.equals(map_val));
+    }
+    
+    // check all inserted elements are in HashMapWrapper
+    for ( String k: map.keySet() ) {
+      String map_val = hashTable.get(k);
+      String val = map.get(k);
+      assertTrue("Some MM HashMap key is not in HashMapWrapper: map_val = " + map_val + "; val = " + val, 
+                 map_val != null && val != null);
+      assertTrue("Value in MM HashMap is not in HashMapWrapper: map_val = " + map_val + "; val = " + val, 
+                 val.equals(map_val));
+    }
+  }
+}



Mime
View raw message