hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From heyongqi...@apache.org
Subject svn commit: r1034276 [3/14] - in /hive/trunk: ./ ql/src/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/ ql/src/java/org/apache/hadoop/hive/ql/optimizer...
Date Fri, 12 Nov 2010 06:12:47 GMT
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Fri Nov 12 06:12:44 2010
@@ -19,456 +19,233 @@
 package org.apache.hadoop.hive.ql.exec.persistence;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.text.NumberFormat;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.util.jdbm.RecordManager;
-import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerFactory;
-import org.apache.hadoop.hive.ql.util.jdbm.RecordManagerOptions;
-import org.apache.hadoop.hive.ql.util.jdbm.helper.FastIterator;
-import org.apache.hadoop.hive.ql.util.jdbm.htree.HTree;
+import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
 
 /**
- * Simple wrapper for persistent Hashmap implementing only the
- * put/get/remove/clear interface. The main memory hash table acts as a cache
- * and all put/get will operate on it first. If the size of the main memory hash
- * table exceeds a certain threshold, new elements will go into the persistent
+ * Simple wrapper for persistent Hashmap implementing only the put/get/remove/clear interface. The
+ * main memory hash table acts as a cache and all put/get will operate on it first. If the size of
+ * the main memory hash table exceeds a certain threshold, new elements will go into the persistent
  * hash table.
  */
-public class HashMapWrapper<K, V> {
+public class HashMapWrapper<K, V> implements Serializable {
 
   protected Log LOG = LogFactory.getLog(this.getClass().getName());
 
   // default threshold for using main memory based HashMap
-  private static final int THRESHOLD = 25000;
+  private static final int THRESHOLD = 1000000;
+  private static final float LOADFACTOR = 0.75f;
 
-  private int threshold; // threshold to put data into persistent hash table
+  private double threshold; // threshold to put data into persistent hash table
   // instead
-  private HashMap<K, MRUItem> mHash; // main memory HashMap
-  private HTree pHash; // persistent HashMap
-  private RecordManager recman; // record manager required by HTree
-  private File tmpFile; // temp file holding the persistent data from record
-  // manager.
-  private MRU<MRUItem> MRUList; // MRU cache entry
+  private HashMap<K, V> mHash; // main memory HashMap
 
-  /**
-   * Doubly linked list of value items. Note: this is only used along with
-   * memory hash table. Persistent hash stores the value directory.
-   */
-  class MRUItem extends DCLLItem {
-    K key;
-    V value;
-
-    MRUItem(K k, V v) {
-      key = k;
-      value = v;
-    }
-  }
+
+
+  protected transient LogHelper console;
+
+  private File dumpFile;
+  public static MemoryMXBean memoryMXBean;
+  private long maxMemory;
+  private long currentMemory;
+  private NumberFormat num;
 
   /**
    * Constructor.
    *
    * @param threshold
-   *          User specified threshold to store new values into persistent
-   *          storage.
+   *          User specified threshold to store new values into persistent storage.
    */
+  public HashMapWrapper(int threshold, float loadFactor) {
+    this.threshold = 0.9;
+    mHash = new HashMap<K, V>(threshold, loadFactor);
+    console = new LogHelper(LOG);
+    memoryMXBean = ManagementFactory.getMemoryMXBean();
+    maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
+    LOG.info("maximum memory: " + maxMemory);
+    num = NumberFormat.getInstance();
+    num.setMinimumFractionDigits(2);
+  }
+
   public HashMapWrapper(int threshold) {
-    this.threshold = threshold;
-    this.pHash = null;
-    this.recman = null;
-    this.tmpFile = null;
-    mHash = new HashMap<K, MRUItem>();
-    MRUList = new MRU<MRUItem>();
+    this(THRESHOLD, 0.75f);
   }
 
   public HashMapWrapper() {
-    this(THRESHOLD);
+    this(THRESHOLD, LOADFACTOR);
   }
 
-  /**
-   * Get the value based on the key. this GET method will directly
-   * return the value from jdbm storage.
-   * @param key
-   * @return Value corresponding to the key. If the key is not found, return
-   *         null.
-   */
-/*
-  public V getMapJoinValueObject(K key) throws HiveException{
-    if(pHash == null) {
-      LOG.warn("the jdbm object is not ready!");
-      throw new HiveException();
-    }
-    try{
-      V value = (V)pHash.get(key);
-      return value;
-    }catch(Exception e){
-      throw new HiveException(e);
-    }
-  }*/
-
-  /*
-   * In this get operation, the jdbm should read only
-   */
-  public V getMapJoinValueObject(K key) throws HiveException {
-    V value = null;
 
-    // if not the MRU, searching the main memory hash table.
-    MRUItem item = mHash.get(key);
-    if (item != null) {
-      value = item.value;
-      MRUList.moveToHead(item);
-    } else if (pHash != null) {
-      try {
-        value = (V) pHash.get(key);
-        if (value != null) {
-          if (mHash.size() < threshold) {
-            MRUItem itm= new MRUItem(key, value);
-            mHash.put(key, itm);
-            //pHash.remove(key);
-            MRUList.put(itm);
-            //recman.commit();
-
-          } else if (threshold > 0) { // flush the LRU to disk
-            MRUItem tail = MRUList.tail(); // least recently used item
-            //pHash.put(tail.key, tail.value);
-            //pHash.remove(key);
-            //recman.commit();
-
-            // update mHash -- reuse MRUItem
-            item = mHash.remove(tail.key);
-            item.key = key;
-            item.value = value;
-            mHash.put(key, item);
-
-            // update MRU -- reusing MRUItem
-            tail.key = key;
-            tail.value = value;
-            MRUList.moveToHead(tail);
-          }
-        }
-      } catch (Exception e) {
-        LOG.warn(e.toString());
-        throw new HiveException(e);
-      }
-    }
-    return value;
-  }
-  public V get(K key) throws HiveException {
-    V value = null;
-
-    // if not the MRU, searching the main memory hash table.
-    MRUItem item = mHash.get(key);
-    if (item != null) {
-      value = item.value;
-      MRUList.moveToHead(item);
-    } else if (pHash != null) {
-      try {
-        value = (V) pHash.get(key);
-        if (value != null) {
-          if (mHash.size() < threshold) {
-            mHash.put(key, new MRUItem(key, value));
-            pHash.remove(key);
-          } else if (threshold > 0) { // flush the LRU to disk
-            MRUItem tail = MRUList.tail(); // least recently used item
-            pHash.put(tail.key, tail.value);
-            pHash.remove(key);
-            recman.commit();
-
-            // update mHash -- reuse MRUItem
-            item = mHash.remove(tail.key);
-            item.key = key;
-            item.value = value;
-            mHash.put(key, item);
-
-            // update MRU -- reusing MRUItem
-            tail.key = key;
-            tail.value = value;
-            MRUList.moveToHead(tail);
-          }
-        }
-      } catch (Exception e) {
-        LOG.warn(e.toString());
-        throw new HiveException(e);
-      }
-    }
-    return value;
+  public V get(K key) {
+    return mHash.get(key);
   }
 
-  /**
-   * Put the key value pair in the hash table. It will first try to put it into
-   * the main memory hash table. If the size exceeds the threshold, it will put
-   * it into the persistent hash table.
-   *
-   * @param key
-   * @param value
-   * @throws HiveException
-   */
-  public void put(K key, V value) throws HiveException {
-    int mm_size = mHash.size();
-    MRUItem itm = mHash.get(key);
-
-    if (mm_size < threshold) {
-      if (itm != null) {
-        // re-use the MRU item -- just overwrite value, key is the same
-        itm.value = value;
-        MRUList.moveToHead(itm);
-        if (!mHash.get(key).value.equals(value)) {
-          LOG.error("HashMapWrapper.put() reuse MRUItem inconsistency [1].");
-        }
-        assert (mHash.get(key).value.equals(value));
-      } else {
-        // check if key already exists in pHash
-        try {
-          if (pHash != null && pHash.get(key) != null) {
-            // remove the old item from pHash and insert the new one
-            pHash.remove(key);
-            pHash.put(key, value);
-            recman.commit();
-            return;
-          }
-        } catch (Exception e) {
-          e.printStackTrace();
-          throw new HiveException(e);
-        }
-        itm = new MRUItem(key, value);
-        MRUList.put(itm);
-        mHash.put(key, itm);
-      }
-    } else {
-      if (itm != null) { // replace existing item
-        // re-use the MRU item -- just overwrite value, key is the same
-        itm.value = value;
-        MRUList.moveToHead(itm);
-        if (!mHash.get(key).value.equals(value)) {
-          LOG.error("HashMapWrapper.put() reuse MRUItem inconsistency [2].");
-        }
-        assert (mHash.get(key).value.equals(value));
-      } else {
-        // for items inserted into persistent hash table, we don't put it into
-        // MRU
-        if (pHash == null) {
-          pHash = getPersistentHash();
-        }
-        try {
-          pHash.put(key, value);
-          recman.commit();
-        } catch (Exception e) {
-          LOG.warn(e.toString());
-          throw new HiveException(e);
-        }
-      }
-    }
-  }
-
-  public void putToJDBM(K key, V value) throws HiveException{
-    if (pHash == null) {
-      pHash = getPersistentHash();
-    }
-    try {
-      pHash.put(key, value);
-      recman.commit();
-    } catch (Exception e) {
-      LOG.warn(e.toString());
-      throw new HiveException(e);
-    }
 
+  public boolean put(K key, V value) throws HiveException {
+    // isAbort();
+    mHash.put(key, value);
+    return false;
+  }
+
+  public void remove(K key) {
+    mHash.remove(key);
   }
 
+
   /**
    * Flush the main memory hash table into the persistent cache file
    *
    * @return persistent cache file
    */
-  public String flushMemoryCacheToPersistent() throws HiveException{
-    try{
-      //if no persistent cache file; create a new one
-      if(pHash == null){
-        pHash = getPersistentHash();
-      }
-      int mm_size = mHash.size();
-      //no data in the memory cache
-      if(mm_size == 0){
-        return tmpFile.getAbsolutePath();
-      }
-      //iterate the memory hash table and put them into persistent file
-      for (Map.Entry<K, MRUItem> entry : mHash.entrySet()) {
-        K key = entry.getKey();
-        MRUItem item = entry.getValue();
-        pHash.put(key, item.value);
-      }
-      //commit to the persistent file
-      recman.commit();
-
-      //release the memory
-      mHash.clear();
-
-    }catch (Exception e) {
-      LOG.warn(e.toString());
-      throw new HiveException(e);
-    }
-    return tmpFile.getAbsolutePath();
-  }
-
-  public void initilizePersistentHash(File jdbmfile) throws HiveException{
-    try{
-      Properties props = new Properties();
-      props.setProperty(RecordManagerOptions.CACHE_TYPE,
-          RecordManagerOptions.NORMAL_CACHE);
-      props.setProperty(RecordManagerOptions.DISABLE_TRANSACTIONS, "true");
-
-      recman = RecordManagerFactory.createRecordManager(jdbmfile, props);
-      long recid = recman.getNamedObject( "hashtable" );
-      if ( recid != 0 ) {
-          pHash = HTree.load( recman, recid );
-      }else{
-        LOG.warn("initiliaze the hash table by jdbm file Error!");
-        throw new HiveException();
-      }
-
-    } catch (Exception e) {
-      e.printStackTrace();
-      LOG.warn(e.toString());
-      throw new HiveException(e);
-    }
+  public long flushMemoryCacheToPersistent(File file) throws IOException {
+    ObjectOutputStream outputStream = null;
+    outputStream = new ObjectOutputStream(new FileOutputStream(file));
+    outputStream.writeObject(mHash);
+    outputStream.flush();
+    outputStream.close();
+
+    return file.length();
   }
 
+  public void initilizePersistentHash(String fileName) throws IOException, ClassNotFoundException {
+    ObjectInputStream inputStream = null;
+    inputStream = new ObjectInputStream(new FileInputStream(fileName));
+    HashMap<K, V> hashtable = (HashMap<K, V>) inputStream.readObject();
+    this.setMHash(hashtable);
+
+    inputStream.close();
+  }
+
+  public int size() {
+    return mHash.size();
+  }
+
+  public Set<K> keySet() {
+    return mHash.keySet();
+  }
+
+
   /**
-   * Get the persistent hash table.
+   * Close the persistent hash table and clean it up.
    *
-   * @return persistent hash table
    * @throws HiveException
    */
-  private HTree getPersistentHash() throws HiveException {
-    try {
-      // Create a temporary file for the page manager to hold persistent data.
-      if (tmpFile != null) {
-        tmpFile.delete();
-      }
-      tmpFile = File.createTempFile("HashMapWrapper", ".tmp", new File("/tmp"));
-      LOG.info("HashMapWrapper created temp file " + tmpFile.getAbsolutePath());
-      // Delete the temp file if the JVM terminate normally through Hadoop job
-      // kill command.
-      // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
-      tmpFile.deleteOnExit();
-
-      Properties props = new Properties();
-      props.setProperty(RecordManagerOptions.CACHE_TYPE,
-          RecordManagerOptions.NO_CACHE);
-      props.setProperty(RecordManagerOptions.DISABLE_TRANSACTIONS, "true");
-
-      recman = RecordManagerFactory.createRecordManager(tmpFile, props);
-      pHash = HTree.createInstance(recman);
-      recman.setNamedObject( "hashtable", pHash.getRecid() );
-      //commit to the persistent file
-      recman.commit();
-    } catch (Exception e) {
-      LOG.warn(e.toString());
-      throw new HiveException(e);
-    }
-    return pHash;
+  public void close() throws HiveException {
+    // isAbort();
+    mHash.clear();
   }
 
-  /**
-   * Clean up the hash table. All elements in the main memory hash table will be
-   * removed, and the persistent hash table will be destroyed (temporary file
-   * will be deleted).
-   */
   public void clear() throws HiveException {
-    if (mHash != null) {
-      mHash.clear();
-      MRUList.clear();
-    }
-    close();
+    mHash.clear();
   }
 
-  /**
-   * Remove one key-value pairs from the hash table based on the given key. If
-   * the pairs are removed from the main memory hash table, pairs in the
-   * persistent hash table will not be moved to the main memory hash table.
-   * Future inserted elements will go into the main memory hash table though.
-   *
-   * @param key
-   * @throws HiveException
-   */
-  public void remove(Object key) throws HiveException {
-    MRUItem entry = mHash.remove(key);
-    if (entry != null) {
-      MRUList.remove(entry);
-    } else if (pHash != null) {
-      try {
-        pHash.remove(key);
-      } catch (Exception e) {
-        LOG.warn(e.toString());
-        throw new HiveException(e);
-      }
-    }
+  public int getKeySize() {
+    return mHash.size();
   }
 
-  /**
-   * Get a list of all keys in the hash map.
-   *
-   * @return
-   */
-  public Set<K> keySet() {
-    HashSet<K> ret = null;
-    if (mHash != null) {
-      ret = new HashSet<K>();
-      ret.addAll(mHash.keySet());
-    }
-    if (pHash != null) {
-      try {
-        FastIterator fitr = pHash.keys();
-        if (fitr != null) {
-          K k;
-          while ((k = (K) fitr.next()) != null) {
-            ret.add(k);
-          }
-        }
-      } catch (Exception e) {
-        e.printStackTrace();
-      }
-    }
-    return ret;
+  private boolean isAbort() {
+    int size = mHash.size();
+    // if(size >= 1000000 && size % 1000000 == 0 ){
+    System.gc();
+    System.gc();
+    long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
+    double rate = (double) usedMemory / (double) maxMemory;
+    long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+    console.printInfo("Hashtable size:\t" + size + "\tMemory usage:\t" + usedMemory + "\t rate:\t"
+        + num.format(rate));
+    return true;
+
   }
 
-  /**
-   * Get the main memory cache capacity.
-   *
-   * @return the maximum number of items can be put into main memory HashMap
-   *         cache.
-   */
-  public int cacheSize() {
+  public Log getLOG() {
+    return LOG;
+  }
+
+  public void setLOG(Log log) {
+    LOG = log;
+  }
+
+  public double getThreshold() {
     return threshold;
   }
 
-  /**
-   * Close the persistent hash table and clean it up.
-   *
-   * @throws HiveException
-   */
-  public void close() throws HiveException {
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public HashMap<K, V> getMHash() {
+    return mHash;
+  }
+
+  public void setMHash(HashMap<K, V> hash) {
+    mHash = hash;
+  }
+
+  public LogHelper getConsole() {
+    return console;
+  }
+
+  public void setConsole(LogHelper console) {
+    this.console = console;
+  }
+
+  public File getDumpFile() {
+    return dumpFile;
+  }
+
+  public void setDumpFile(File dumpFile) {
+    this.dumpFile = dumpFile;
+  }
+
+  public static MemoryMXBean getMemoryMXBean() {
+    return memoryMXBean;
+  }
+
+  public static void setMemoryMXBean(MemoryMXBean memoryMXBean) {
+    HashMapWrapper.memoryMXBean = memoryMXBean;
+  }
+
+  public long getMaxMemory() {
+    return maxMemory;
+  }
 
-    if (pHash != null) {
-      try {
-        if (recman != null) {
-          recman.close();
-        }
-      } catch (Exception e) {
-        throw new HiveException(e);
-      }
-      // delete the temporary file
-      if(tmpFile != null){
-        tmpFile.delete();
-        tmpFile = null;
-      }
-      pHash = null;
-      recman = null;
-    }
+  public void setMaxMemory(long maxMemory) {
+    this.maxMemory = maxMemory;
   }
+
+  public long getCurrentMemory() {
+    return currentMemory;
+  }
+
+  public void setCurrentMemory(long currentMemory) {
+    this.currentMemory = currentMemory;
+  }
+
+  public NumberFormat getNum() {
+    return num;
+  }
+
+  public void setNum(NumberFormat num) {
+    this.num = num;
+  }
+
+  public static int getTHRESHOLD() {
+    return THRESHOLD;
+  }
+
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java?rev=1034276&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java Fri Nov 12 06:12:44 2010
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.Writable;
+
+public class MapJoinDoubleKeys extends AbstractMapJoinKey {
+
+  protected transient Object obj1;
+  protected transient Object obj2;
+
+
+  public MapJoinDoubleKeys() {
+  }
+
+  /**
+   * @param metadataTag
+   * @param obj
+   */
+  public MapJoinDoubleKeys(Object obj1, Object obj2) {
+    this.obj1 = obj1;
+    this.obj2 = obj2;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof MapJoinDoubleKeys) {
+      MapJoinDoubleKeys mObj = (MapJoinDoubleKeys) o;
+      Object key1 = mObj.getObj1();
+      Object key2 = mObj.getObj2();
+
+      if ((obj1 == null) && (key1 == null)) {
+        if ((obj2 == null) && (key2 == null)) {
+          return true;
+        }
+      }
+      if ((obj1 != null) && (key1 != null)) {
+        if (obj1.equals(key1)) {
+          if ((obj2 != null) && (key2 != null)) {
+            if (obj2.equals(key2)) {
+              return true;
+            }
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashCode = 1;
+    if (obj1 == null) {
+      hashCode = metadataTag;
+    } else {
+      hashCode += (31 + obj1.hashCode());
+    }
+    if (obj2 == null) {
+      hashCode += metadataTag;
+    } else {
+      hashCode += (31 + obj2.hashCode());
+    }
+    return hashCode;
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    try {
+      // get the tableDesc from the map stored in the mapjoin operator
+      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+
+      Writable val = ctx.getSerDe().getSerializedClass().newInstance();
+      val.readFields(in);
+
+
+
+      ArrayList<Object> list = (ArrayList<Object>) ObjectInspectorUtils.copyToStandardObject(ctx
+          .getSerDe().deserialize(val), ctx.getSerDe().getObjectInspector(),
+          ObjectInspectorCopyOption.WRITABLE);
+
+      if (list == null) {
+        obj1 = null;
+        obj2 = null;
+
+      } else {
+        obj1 = list.get(0);
+        obj2 = list.get(1);
+      }
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+  }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    try {
+      // out.writeInt(metadataTag);
+      // get the tableDesc from the map stored in the mapjoin operator
+      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+
+      ArrayList<Object> list = MapJoinMetaData.getList();
+      list.add(obj1);
+      list.add(obj2);
+      // Different processing for key and value
+      Writable outVal = ctx.getSerDe().serialize(list, ctx.getStandardOI());
+      outVal.write(out);
+
+    } catch (SerDeException e) {
+      throw new IOException(e);
+    }
+  }
+
+
+
+  /**
+   * @return the obj
+   */
+  public Object getObj1() {
+    return obj1;
+  }
+
+  /**
+   * @param obj
+   *          the obj to set
+   */
+  public void setObj1(Object obj1) {
+    this.obj1 = obj1;
+  }
+
+  /**
+   * @return the obj
+   */
+  public Object getObj2() {
+    return obj2;
+  }
+
+  /**
+   * @param obj
+   *          the obj to set
+   */
+  public void setObj2(Object obj2) {
+    this.obj2 = obj2;
+  }
+
+
+  @Override
+  public boolean hasAnyNulls() {
+    if (obj1 == null) {
+      return true;
+    }
+    if (obj2 == null) {
+      return true;
+    }
+    return false;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java Fri Nov 12 06:12:44 2010
@@ -18,14 +18,13 @@
 
 package org.apache.hadoop.hive.ql.exec.persistence;
 
-import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 
 import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
-import org.apache.hadoop.hive.ql.exec.JDBMSinkOperator.JDBMSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -34,10 +33,10 @@ import org.apache.hadoop.io.Writable;
 /**
  * Map Join Object used for both key.
  */
-public class MapJoinObjectKey implements Externalizable {
+public class MapJoinObjectKey  extends AbstractMapJoinKey {
 
-  protected transient int metadataTag;
-  protected transient ArrayList<Object> obj;
+
+  protected transient Object[] obj;
 
   public MapJoinObjectKey() {
   }
@@ -46,8 +45,7 @@ public class MapJoinObjectKey implements
    * @param metadataTag
    * @param obj
    */
-  public MapJoinObjectKey(int metadataTag, ArrayList<Object> obj) {
-    this.metadataTag = metadataTag;
+  public MapJoinObjectKey(Object[] obj) {
     this.obj = obj;
   }
 
@@ -55,43 +53,60 @@ public class MapJoinObjectKey implements
   public boolean equals(Object o) {
     if (o instanceof MapJoinObjectKey) {
       MapJoinObjectKey mObj = (MapJoinObjectKey) o;
-      if (mObj.getMetadataTag() == metadataTag) {
-        if ((obj == null) && (mObj.getObj() == null)) {
-          return true;
-        }
-        if ((obj != null) && (mObj.getObj() != null)
-            && (mObj.getObj().equals(obj))) {
+      Object[] mObjArray = mObj.getObj();
+      if ((obj == null) && (mObjArray == null)) {
+        return true;
+      }
+      if ((obj != null) && (mObjArray != null)) {
+        if (obj.length == mObjArray.length) {
+          for (int i = 0; i < obj.length; i++) {
+            if (!obj[i].equals(mObjArray[i])) {
+              return false;
+            }
+          }
           return true;
         }
       }
     }
-
     return false;
   }
 
   @Override
   public int hashCode() {
-    return (obj == null) ? metadataTag : obj.hashCode();
+    int hashCode;
+    if (obj == null) {
+      hashCode = metadataTag;
+    } else {
+      hashCode = 1;
+
+      for (int i = 0; i < obj.length; i++) {
+        Object o = obj[i];
+        hashCode = 31 * hashCode + (o == null ? 0 : o.hashCode());
+      }
+
+    }
+    return hashCode;
   }
 
   @Override
   public void readExternal(ObjectInput in) throws IOException,
       ClassNotFoundException {
     try {
-      metadataTag = in.readInt();
-
       // get the tableDesc from the map stored in the mapjoin operator
-      JDBMSinkObjectCtx ctx = MapJoinMetaData.get(
+      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(
           Integer.valueOf(metadataTag));
 
       Writable val = ctx.getSerDe().getSerializedClass().newInstance();
       val.readFields(in);
-      obj = (ArrayList<Object>) ObjectInspectorUtils.copyToStandardObject(ctx
+      ArrayList<Object> list = (ArrayList<Object>) ObjectInspectorUtils.copyToStandardObject(ctx
           .getSerDe().deserialize(val), ctx.getSerDe().getObjectInspector(),
           ObjectInspectorCopyOption.WRITABLE);
-      if(obj == null){
-        obj = new ArrayList<Object>(0);
+      if(list == null){
+        obj = new ArrayList(0).toArray();
+      }else{
+        obj = list.toArray();
       }
+
     } catch (Exception e) {
       throw new IOException(e);
     }
@@ -101,9 +116,8 @@ public class MapJoinObjectKey implements
   @Override
   public void writeExternal(ObjectOutput out) throws IOException {
     try {
-      out.writeInt(metadataTag);
       // get the tableDesc from the map stored in the mapjoin operator
-      JDBMSinkObjectCtx ctx = MapJoinMetaData.get(
+      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(
           Integer.valueOf(metadataTag));
 
       // Different processing for key and value
@@ -114,25 +128,11 @@ public class MapJoinObjectKey implements
     }
   }
 
-  /**
-   * @return the metadataTag
-   */
-  public int getMetadataTag() {
-    return metadataTag;
-  }
-
-  /**
-   * @param metadataTag
-   *          the metadataTag to set
-   */
-  public void setMetadataTag(int metadataTag) {
-    this.metadataTag = metadataTag;
-  }
 
   /**
    * @return the obj
    */
-  public ArrayList<Object> getObj() {
+  public Object[] getObj() {
     return obj;
   }
 
@@ -140,8 +140,21 @@ public class MapJoinObjectKey implements
    * @param obj
    *          the obj to set
    */
-  public void setObj(ArrayList<Object> obj) {
+  public void setObj(Object[] obj) {
     this.obj = obj;
   }
 
+  @Override
+  public boolean hasAnyNulls(){
+    if (obj != null && obj.length> 0) {
+      for (Object k : obj) {
+        if (k == null) {
+          return true;
+        }
+      }
+    }
+    return false;
+
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java Fri Nov 12 06:12:44 2010
@@ -24,37 +24,33 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
-import org.apache.hadoop.hive.ql.exec.JDBMSinkOperator.JDBMSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.io.Writable;
+
 /**
  * Map Join Object used for both key and value.
  */
 public class MapJoinObjectValue implements Externalizable {
 
   protected transient int metadataTag;
-  protected transient RowContainer obj;
-  protected transient Configuration conf;
-  protected int bucketSize; // bucket size for RowContainer
-  protected Log LOG = LogFactory.getLog(this.getClass().getName());
+  protected transient MapJoinRowContainer<Object[]> obj;
+
+
 
   public MapJoinObjectValue() {
-    bucketSize = 100; // default bucket size
+
   }
 
   /**
    * @param metadataTag
    * @param obj
    */
-  public MapJoinObjectValue(int metadataTag, RowContainer obj) {
+  public MapJoinObjectValue(int metadataTag, MapJoinRowContainer<Object[]> obj) {
     this.metadataTag = metadataTag;
     this.obj = obj;
   }
@@ -63,12 +59,12 @@ public class MapJoinObjectValue implemen
   public boolean equals(Object o) {
     if (o instanceof MapJoinObjectValue) {
       MapJoinObjectValue mObj = (MapJoinObjectValue) o;
+
       if (mObj.getMetadataTag() == metadataTag) {
         if ((obj == null) && (mObj.getObj() == null)) {
           return true;
         }
-        if ((obj != null) && (mObj.getObj() != null)
-            && (mObj.getObj().equals(obj))) {
+        if ((obj != null) && (mObj.getObj() != null) && (mObj.getObj().equals(obj))) {
           return true;
         }
       }
@@ -82,20 +78,16 @@ public class MapJoinObjectValue implemen
   }
 
   @Override
-  public void readExternal(ObjectInput in) throws IOException,
-      ClassNotFoundException {
+  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
     try {
 
       metadataTag = in.readInt();
 
       // get the tableDesc from the map stored in the mapjoin operator
-      JDBMSinkObjectCtx ctx = MapJoinMetaData.get(
-          Integer.valueOf(metadataTag));
+      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
       int sz = in.readInt();
 
-      RowContainer res = new RowContainer(bucketSize, ctx.getConf());
-      res.setSerDe(ctx.getSerDe(), ctx.getStandardOI());
-      res.setTableDesc(ctx.getTblDesc());
+      MapJoinRowContainer<Object[]> res = new MapJoinRowContainer<Object[]>();
       if (sz > 0) {
         int numCols = in.readInt();
         if (numCols > 0) {
@@ -104,16 +96,18 @@ public class MapJoinObjectValue implemen
             val.readFields(in);
 
             ArrayList<Object> memObj = (ArrayList<Object>) ObjectInspectorUtils
-              .copyToStandardObject(ctx.getSerDe().deserialize(val), ctx
-              .getSerDe().getObjectInspector(),
-               ObjectInspectorCopyOption.WRITABLE);
+                .copyToStandardObject(ctx.getSerDe().deserialize(val), ctx.getSerDe()
+                    .getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
 
-            res.add(memObj);
+            if (memObj == null) {
+              res.add(new ArrayList<Object>(0).toArray());
+            } else {
+              res.add(memObj.toArray());
+            }
           }
-        }
-        else{
-          for(int i = 0 ; i <sz; i++){
-            res.add(new ArrayList<Object>(0));
+        } else {
+          for (int i = 0; i < sz; i++) {
+            res.add(new ArrayList<Object>(0).toArray());
           }
         }
       }
@@ -130,17 +124,16 @@ public class MapJoinObjectValue implemen
       out.writeInt(metadataTag);
 
       // get the tableDesc from the map stored in the mapjoin operator
-      JDBMSinkObjectCtx ctx = MapJoinMetaData.get(
-          Integer.valueOf(metadataTag));
+      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
 
       // Different processing for key and value
-      RowContainer<ArrayList<Object>> v = obj;
+      MapJoinRowContainer<Object[]> v = obj;
       out.writeInt(v.size());
       if (v.size() > 0) {
-        ArrayList<Object> row = v.first();
-        out.writeInt(row.size());
+        Object[] row = v.first();
+        out.writeInt(row.length);
 
-        if (row.size() > 0) {
+        if (row.length > 0) {
           for (; row != null; row = v.next()) {
             Writable outVal = ctx.getSerDe().serialize(row, ctx.getStandardOI());
             outVal.write(out);
@@ -172,7 +165,7 @@ public class MapJoinObjectValue implemen
   /**
    * @return the obj
    */
-  public RowContainer getObj() {
+  public MapJoinRowContainer<Object[]>  getObj() {
     return obj;
   }
 
@@ -180,13 +173,8 @@ public class MapJoinObjectValue implemen
    * @param obj
    *          the obj to set
    */
-  public void setObj(RowContainer obj) {
+  public void setObj(MapJoinRowContainer<Object[]>  obj) {
     this.obj = obj;
   }
 
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-    bucketSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVEMAPJOINBUCKETCACHESIZE);
-  }
-
 }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java?rev=1034276&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java Fri Nov 12 06:12:44 2010
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+public class MapJoinRowContainer<Row> extends AbstractRowContainer<Row> {
+
+  private List<Row> list;
+
+  private int index;
+
+  public MapJoinRowContainer() {
+    index = 0;
+    list = new ArrayList<Row>(1);
+  }
+
+  @Override
+  public void add(Row t) throws HiveException {
+    list.add(t);
+  }
+
+
+  @Override
+  public Row first() throws HiveException {
+    index = 0;
+    if (index < list.size()) {
+      return list.get(index);
+    }
+    return null;
+  }
+
+  @Override
+  public Row next() throws HiveException {
+    index++;
+    if (index < list.size()) {
+      return list.get(index);
+    }
+    return null;
+
+  }
+
+  /**
+   * Get the number of elements in the RowContainer.
+   *
+   * @return number of elements in the RowContainer
+   */
+  @Override
+  public int size() {
+    return list.size();
+  }
+
+  /**
+   * Remove all elements in the RowContainer.
+   */
+  @Override
+  public void clear() throws HiveException {
+    list.clear();
+    index = 0;
+  }
+
+  public List<Row> getList() {
+    return list;
+  }
+
+  public void setList(List<Row> list) {
+    this.list = list;
+  }
+
+  public void reset(MapJoinRowContainer<Object[]> other) throws HiveException {
+    list.clear();
+    Object[] obj;
+    for (obj = other.first(); obj != null; obj = other.next()) {
+      ArrayList<Object> ele = new ArrayList(obj.length);
+      for (int i = 0; i < obj.length; i++) {
+        ele.add(obj[i]);
+      }
+      list.add((Row) ele);
+    }
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java?rev=1034276&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java Fri Nov 12 06:12:44 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.persistence;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.Writable;
+
+public class MapJoinSingleKey extends AbstractMapJoinKey {
+
+
+  protected transient Object obj;
+
+  public MapJoinSingleKey() {
+  }
+
+  /**
+   * @param metadataTag
+   * @param obj
+   */
+  public MapJoinSingleKey(Object obj) {
+    this.obj = obj;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof MapJoinSingleKey) {
+      MapJoinSingleKey mObj = (MapJoinSingleKey) o;
+      Object key = mObj.getObj();
+      if ((obj == null) && (key == null)) {
+        return true;
+      }
+      if ((obj != null) && (key != null)) {
+        if (obj.equals(key)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    int hashCode;
+    if (obj == null) {
+      hashCode = metadataTag;
+    } else {
+      hashCode = 31 + obj.hashCode();
+    }
+    return hashCode;
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    try {
+      // get the tableDesc from the map stored in the mapjoin operator
+      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+
+      Writable val = ctx.getSerDe().getSerializedClass().newInstance();
+      val.readFields(in);
+
+
+
+      ArrayList<Object> list = (ArrayList<Object>) ObjectInspectorUtils.copyToStandardObject(ctx
+          .getSerDe().deserialize(val), ctx.getSerDe().getObjectInspector(),
+          ObjectInspectorCopyOption.WRITABLE);
+
+      if (list == null) {
+        obj = null;
+        System.out.println("read empty back");
+      } else {
+        obj = list.get(0);
+      }
+
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+  }
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    try {
+      // out.writeInt(metadataTag);
+      // get the tableDesc from the map stored in the mapjoin operator
+      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+
+      ArrayList<Object> list = MapJoinMetaData.getList();
+      list.add(obj);
+
+      // Different processing for key and value
+      Writable outVal = ctx.getSerDe().serialize(list, ctx.getStandardOI());
+      outVal.write(out);
+
+    } catch (SerDeException e) {
+      throw new IOException(e);
+    }
+  }
+
+
+
+  /**
+   * @return the obj
+   */
+  public Object getObj() {
+    return obj;
+  }
+
+  /**
+   * @param obj
+   *          the obj to set
+   */
+  public void setObj(Object obj) {
+    this.obj = obj;
+  }
+
+  @Override
+  public boolean hasAnyNulls() {
+    if (obj == null) {
+      return true;
+    }
+    return false;
+  }
+
+
+
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Fri Nov 12 06:12:44 2010
@@ -51,31 +51,27 @@ import org.apache.hadoop.util.Reflection
 /**
  * Simple persistent container for rows.
  *
- * This container interface only accepts adding or appending new rows and
- * iterating through the rows in the order of their insertions.
+ * This container interface only accepts adding or appending new rows and iterating through the rows
+ * in the order of their insertions.
  *
- * The iterator interface is a lightweight first()/next() API rather than the
- * Java Iterator interface. This way we do not need to create an Iterator object
- * every time we want to start a new iteration. Below is simple example of how
- * to convert a typical Java's Iterator code to the LW iterator interface.
+ * The iterator interface is a lightweight first()/next() API rather than the Java Iterator
+ * interface. This way we do not need to create an Iterator object every time we want to start a new
+ * iteration. Below is simple example of how to convert a typical Java's Iterator code to the LW
+ * iterator interface.
  *
- * Iterator itr = rowContainer.iterator();
- * while (itr.hasNext()) {
- *   v = itr.next(); // do anything with v
- * }
+ * Iterator itr = rowContainer.iterator(); while (itr.hasNext()) { v = itr.next(); // do anything
+ * with v }
  *
  * can be rewritten to:
  *
- * for ( v = rowContainer.first(); v != null; v = rowContainer.next()) {
- *   // do anything with v
- * }
+ * for ( v = rowContainer.first(); v != null; v = rowContainer.next()) { // do anything with v }
  *
- * Once the first is called, it will not be able to write again. So there can
- * not be any writes after read. It can be read multiple times, but it does not
- * support multiple reader interleaving reading.
+ * Once the first is called, it will not be able to write again. So there can not be any writes
+ * after read. It can be read multiple times, but it does not support multiple reader interleaving
+ * reading.
  *
  */
-public class RowContainer<Row extends List<Object>> {
+public class RowContainer<Row extends List<Object>> extends AbstractRowContainer<Row> {
 
   protected static Log LOG = LogFactory.getLog(RowContainer.class);
 
@@ -120,6 +116,10 @@ public class RowContainer<Row extends Li
   JobConf jobCloneUsingLocalFs = null;
   private LocalFileSystem localFs;
 
+  public RowContainer() {
+
+  }
+
   public RowContainer(Configuration jc) throws HiveException {
     this(BLOCKSIZE, jc);
   }
@@ -137,21 +137,20 @@ public class RowContainer<Row extends Li
     this.firstReadBlockPointer = currentReadBlock;
     this.serde = null;
     this.standardOI = null;
-    this.jc=jc;
+    this.jc = jc;
   }
 
   private JobConf getLocalFSJobConfClone(Configuration jc) {
-    if(this.jobCloneUsingLocalFs == null) {
+    if (this.jobCloneUsingLocalFs == null) {
       this.jobCloneUsingLocalFs = new JobConf(jc);
-      HiveConf.setVar(jobCloneUsingLocalFs, HiveConf.ConfVars.HADOOPFS,
-          Utilities.HADOOP_LOCAL_FS);
+      HiveConf.setVar(jobCloneUsingLocalFs, HiveConf.ConfVars.HADOOPFS, Utilities.HADOOP_LOCAL_FS);
     }
     return this.jobCloneUsingLocalFs;
   }
 
 
-  public RowContainer(int blockSize, SerDe sd, ObjectInspector oi,
-      Configuration jc) throws HiveException {
+  public RowContainer(int blockSize, SerDe sd, ObjectInspector oi, Configuration jc)
+      throws HiveException {
     this(blockSize, jc);
     setSerDe(sd, oi);
   }
@@ -161,6 +160,7 @@ public class RowContainer<Row extends Li
     this.standardOI = oi;
   }
 
+  @Override
   public void add(Row t) throws HiveException {
     if (this.tblDesc != null) {
       if (addCursor >= blockSize) { // spill the current block to tmp file
@@ -180,6 +180,7 @@ public class RowContainer<Row extends Li
     ++size;
   }
 
+  @Override
   public Row first() throws HiveException {
     if (size == 0) {
       return null;
@@ -209,21 +210,17 @@ public class RowContainer<Row extends Li
         JobConf localJc = getLocalFSJobConfClone(jc);
         if (inputSplits == null) {
           if (this.inputFormat == null) {
-            inputFormat = (InputFormat<WritableComparable, Writable>) ReflectionUtils
-                .newInstance(tblDesc.getInputFileFormatClass(),
-                    localJc);
+            inputFormat = (InputFormat<WritableComparable, Writable>) ReflectionUtils.newInstance(
+                tblDesc.getInputFileFormatClass(), localJc);
           }
 
-          HiveConf.setVar(localJc,
-              HiveConf.ConfVars.HADOOPMAPREDINPUTDIR,
-              org.apache.hadoop.util.StringUtils.escapeString(parentFile
-              .getAbsolutePath()));
+          HiveConf.setVar(localJc, HiveConf.ConfVars.HADOOPMAPREDINPUTDIR,
+              org.apache.hadoop.util.StringUtils.escapeString(parentFile.getAbsolutePath()));
           inputSplits = inputFormat.getSplits(localJc, 1);
           acutalSplitNum = inputSplits.length;
         }
         currentSplitPointer = 0;
-        rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer],
-            localJc, Reporter.NULL);
+        rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], localJc, Reporter.NULL);
         currentSplitPointer++;
 
         nextBlock();
@@ -238,6 +235,7 @@ public class RowContainer<Row extends Li
 
   }
 
+  @Override
   public Row next() throws HiveException {
 
     if (!firstCalled) {
@@ -278,8 +276,7 @@ public class RowContainer<Row extends Li
   }
 
   private void removeKeys(Row ret) {
-    if (this.keyObject != null
-        && this.currentReadBlock != this.currentWriteBlock) {
+    if (this.keyObject != null && this.currentReadBlock != this.currentWriteBlock) {
       int len = this.keyObject.size();
       int rowSize = ((ArrayList) ret).size();
       for (int i = 0; i < len; i++) {
@@ -317,16 +314,13 @@ public class RowContainer<Row extends Li
         tmpFile.deleteOnExit();
 
         // rFile = new RandomAccessFile(tmpFile, "rw");
-        HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc
-            .getOutputFileFormatClass().newInstance();
+        HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
         tempOutPath = new Path(tmpFile.toString());
         JobConf localJc = getLocalFSJobConfClone(jc);
-        rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs,
-            hiveOutputFormat, serde.getSerializedClass(), false, tblDesc
-            .getProperties(), tempOutPath);
+        rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs, hiveOutputFormat, serde
+            .getSerializedClass(), false, tblDesc.getProperties(), tempOutPath);
       } else if (rw == null) {
-        throw new HiveException(
-            "RowContainer has already been closed for writing.");
+        throw new HiveException("RowContainer has already been closed for writing.");
       }
 
       row.clear();
@@ -366,6 +360,7 @@ public class RowContainer<Row extends Li
    *
    * @return number of elements in the RowContainer
    */
+  @Override
   public int size() {
     return size;
   }
@@ -388,17 +383,16 @@ public class RowContainer<Row extends Li
         Object key = rr.createKey();
         while (i < this.currentReadBlock.length && rr.next(key, val)) {
           nextSplit = false;
-          this.currentReadBlock[i++] = (Row) ObjectInspectorUtils
-              .copyToStandardObject(serde.deserialize(val), serde
-              .getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+          this.currentReadBlock[i++] = (Row) ObjectInspectorUtils.copyToStandardObject(serde
+              .deserialize(val), serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
         }
       }
 
       if (nextSplit && this.currentSplitPointer < this.acutalSplitNum) {
         JobConf localJc = getLocalFSJobConfClone(jc);
         // open record reader to read next split
-        rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer],
-            jobCloneUsingLocalFs, Reporter.NULL);
+        rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
+            Reporter.NULL);
         currentSplitPointer++;
         return nextBlock();
       }
@@ -416,8 +410,7 @@ public class RowContainer<Row extends Li
     }
   }
 
-  public void copyToDFSDirecory(FileSystem destFs, Path destPath)
-      throws IOException, HiveException {
+  public void copyToDFSDirecory(FileSystem destFs, Path destPath) throws IOException, HiveException {
     if (addCursor > 0) {
       this.spillBlock(this.currentWriteBlock, addCursor);
     }
@@ -425,16 +418,17 @@ public class RowContainer<Row extends Li
       return;
     }
     this.closeWriter();
-    LOG.info("RowContainer copied temp file " + tmpFile.getAbsolutePath()
-        + " to dfs directory " + destPath.toString());
-    destFs.copyFromLocalFile(true, tempOutPath, new Path(destPath, new Path(
-        tempOutPath.getName())));
+    LOG.info("RowContainer copied temp file " + tmpFile.getAbsolutePath() + " to dfs directory "
+        + destPath.toString());
+    destFs
+        .copyFromLocalFile(true, tempOutPath, new Path(destPath, new Path(tempOutPath.getName())));
     clear();
   }
 
   /**
    * Remove all elements in the RowContainer.
    */
+  @Override
   public void clear() throws HiveException {
     itrCursor = 0;
     addCursor = 0;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LocalMapJoinProcFactory.java Fri Nov 12 06:12:44 2010
@@ -23,8 +23,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Stack;
 
-import org.apache.hadoop.hive.ql.exec.JDBMDummyOperator;
-import org.apache.hadoop.hive.ql.exec.JDBMSinkOperator;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -35,8 +35,8 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.physical.MapJoinResolver.LocalMapJoinProcCtx;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.JDBMDummyDesc;
-import org.apache.hadoop.hive.ql.plan.JDBMSinkDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 
@@ -78,10 +78,8 @@ public final class LocalMapJoinProcFacto
       }
       MapJoinOperator mapJoinOp = (MapJoinOperator) nd;
 
-      //create an new operator: JDBMSinkOperator
-      JDBMSinkDesc jdbmSinkDesc = new JDBMSinkDesc(mapJoinOp.getConf());
-      JDBMSinkOperator jdbmSinkOp =(JDBMSinkOperator)OperatorFactory.get(jdbmSinkDesc);
-
+      HashTableSinkDesc hashTableSinkDesc = new HashTableSinkDesc(mapJoinOp.getConf());
+      HashTableSinkOperator hashTableSinkOp =(HashTableSinkOperator)OperatorFactory.get(hashTableSinkDesc);
 
       //get the last operator for processing big tables
       int bigTable = mapJoinOp.getConf().getPosBigTable();
@@ -90,7 +88,7 @@ public final class LocalMapJoinProcFacto
 
       Operator<? extends Serializable> bigOp = mapJoinOp.getParentOperators().get(bigTable);
 
-      //the parent ops for jdbmSinkOp
+      //the parent ops for hashTableSinkOp
       List<Operator<?extends Serializable>> smallTablesParentOp= new ArrayList<Operator<?extends Serializable>>();
 
       List<Operator<?extends Serializable>> dummyOperators= new ArrayList<Operator<?extends Serializable>>();
@@ -103,14 +101,14 @@ public final class LocalMapJoinProcFacto
         }
 
         Operator<? extends Serializable> parent = parentsOp.get(i);
-        //let jdbmOp be the child of this parent
-        parent.replaceChild(mapJoinOp, jdbmSinkOp);
+        //let hashtable Op be the child of this parent
+        parent.replaceChild(mapJoinOp, hashTableSinkOp);
         //keep the parent id correct
         smallTablesParentOp.add(parent);
 
-        //create an new operator: JDBMDummyOpeator, which share the table desc
-        JDBMDummyDesc desc = new JDBMDummyDesc();
-        JDBMDummyOperator dummyOp =(JDBMDummyOperator)OperatorFactory.get(desc);
+        //create an new operator: HashTable DummyOpeator, which share the table desc
+        HashTableDummyDesc desc = new HashTableDummyDesc();
+        HashTableDummyOperator dummyOp =(HashTableDummyOperator)OperatorFactory.get(desc);
         TableDesc tbl;
 
         if(parent.getSchema()==null){
@@ -140,7 +138,7 @@ public final class LocalMapJoinProcFacto
 
       }
 
-      jdbmSinkOp.setParentOperators(smallTablesParentOp);
+      hashTableSinkOp.setParentOperators(smallTablesParentOp);
       for(Operator<? extends Serializable> op: dummyOperators){
         context.addDummyParentOp(op);
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapJoinResolver.java Fri Nov 12 06:12:44 2010
@@ -26,13 +26,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.MapredLocalTask;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -97,9 +97,10 @@ public class MapJoinResolver implements 
       if(localwork != null){
         //get the context info and set up the shared tmp URI
         Context ctx = physicalContext.getContext();
-        String tmpFileURI = ctx.getLocalTmpFileURI()+Path.SEPARATOR+"JDBM-"+currTask.getId();
+        String tmpFileURI = Utilities.generateTmpURI(ctx.getLocalTmpFileURI(), currTask.getId());
         localwork.setTmpFileURI(tmpFileURI);
-        mapredWork.setTmpHDFSFileURI(ctx.getMRTmpFileURI()+Path.SEPARATOR+"JDBM-"+currTask.getId());
+        String hdfsTmpURI = Utilities.generateTmpURI(ctx.getMRTmpFileURI(), currTask.getId());
+        mapredWork.setTmpHDFSFileURI(hdfsTmpURI);
         //create a task for this local work; right now, this local work is shared
         //by the original MapredTask and this new generated MapredLocalTask.
         MapredLocalTask localTask = (MapredLocalTask) TaskFactory.get(localwork,

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableDummyDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableDummyDesc.java?rev=1034276&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableDummyDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableDummyDesc.java Fri Nov 12 06:12:44 2010
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+/**
+ * HashTable Dummy Descriptor implementation.
+ *
+ */
+@Explain(displayName = "HashTable Dummy Operator")
+public class HashTableDummyDesc implements Serializable {
+  private TableDesc tbl;
+
+  public TableDesc getTbl() {
+    return tbl;
+  }
+
+  public void setTbl(TableDesc tbl) {
+    this.tbl = tbl;
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java?rev=1034276&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableSinkDesc.java Fri Nov 12 06:12:44 2010
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+/**
+ * Map Join operator Descriptor implementation.
+ *
+ */
+@Explain(displayName = "HashTable Sink Operator")
+public class HashTableSinkDesc extends JoinDesc implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+
+  // used to handle skew join
+  private boolean handleSkewJoin = false;
+  private int skewKeyDefinition = -1;
+  private Map<Byte, String> bigKeysDirMap;
+  private Map<Byte, Map<Byte, String>> smallKeysDirMap;
+  private Map<Byte, TableDesc> skewKeysValuesTables;
+
+  // alias to key mapping
+  private Map<Byte, List<ExprNodeDesc>> exprs;
+
+  // alias to filter mapping
+  private Map<Byte, List<ExprNodeDesc>> filters;
+
+  // used for create joinOutputObjectInspector
+  protected List<String> outputColumnNames;
+
+  // key:column output name, value:tag
+  private transient Map<String, Byte> reversedExprs;
+
+  // No outer join involved
+  protected boolean noOuterJoin;
+
+  protected JoinCondDesc[] conds;
+
+  protected Byte[] tagOrder;
+  private TableDesc keyTableDesc;
+
+
+  private Map<Byte, List<ExprNodeDesc>> keys;
+  private TableDesc keyTblDesc;
+  private List<TableDesc> valueTblDescs;
+
+  private int posBigTable;
+
+  private Map<Byte, List<Integer>> retainList;
+
+  private transient String bigTableAlias;
+
+  private LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping;
+  private LinkedHashMap<String, Integer> bucketFileNameMapping;
+
+  public HashTableSinkDesc() {
+    bucketFileNameMapping = new LinkedHashMap<String, Integer>();
+  }
+
+  public HashTableSinkDesc(MapJoinDesc clone) {
+    this.bigKeysDirMap = clone.getBigKeysDirMap();
+    this.conds = clone.getConds();
+    this.exprs= clone.getExprs();
+    this.handleSkewJoin = clone.getHandleSkewJoin();
+    this.keyTableDesc = clone.getKeyTableDesc();
+    this.noOuterJoin = clone.getNoOuterJoin();
+    this.outputColumnNames = clone.getOutputColumnNames();
+    this.reversedExprs = clone.getReversedExprs();
+    this.skewKeyDefinition = clone.getSkewKeyDefinition();
+    this.skewKeysValuesTables = clone.getSkewKeysValuesTables();
+    this.smallKeysDirMap = clone.getSmallKeysDirMap();
+    this.tagOrder = clone.getTagOrder();
+    this.filters = clone.getFilters();
+
+    this.keys = clone.getKeys();
+    this.keyTblDesc = clone.getKeyTblDesc();
+    this.valueTblDescs = clone.getValueTblDescs();
+    this.posBigTable = clone.getPosBigTable();
+    this.retainList = clone.getRetainList();
+    this.bigTableAlias = clone.getBigTableAlias();
+    this.aliasBucketFileNameMapping = clone.getAliasBucketFileNameMapping();
+    this.bucketFileNameMapping = clone.getBucketFileNameMapping();
+  }
+
+
+  private void initRetainExprList() {
+    retainList = new HashMap<Byte, List<Integer>>();
+    Set<Entry<Byte, List<ExprNodeDesc>>> set = exprs.entrySet();
+    Iterator<Entry<Byte, List<ExprNodeDesc>>> setIter = set.iterator();
+    while (setIter.hasNext()) {
+      Entry<Byte, List<ExprNodeDesc>> current = setIter.next();
+      List<Integer> list = new ArrayList<Integer>();
+      for (int i = 0; i < current.getValue().size(); i++) {
+        list.add(i);
+      }
+      retainList.put(current.getKey(), list);
+    }
+  }
+
+  public boolean isHandleSkewJoin() {
+    return handleSkewJoin;
+  }
+
+  @Override
+  public void setHandleSkewJoin(boolean handleSkewJoin) {
+    this.handleSkewJoin = handleSkewJoin;
+  }
+
+  @Override
+  public int getSkewKeyDefinition() {
+    return skewKeyDefinition;
+  }
+
+  @Override
+  public void setSkewKeyDefinition(int skewKeyDefinition) {
+    this.skewKeyDefinition = skewKeyDefinition;
+  }
+
+  @Override
+  public Map<Byte, String> getBigKeysDirMap() {
+    return bigKeysDirMap;
+  }
+
+  @Override
+  public void setBigKeysDirMap(Map<Byte, String> bigKeysDirMap) {
+    this.bigKeysDirMap = bigKeysDirMap;
+  }
+
+  @Override
+  public Map<Byte, Map<Byte, String>> getSmallKeysDirMap() {
+    return smallKeysDirMap;
+  }
+
+  @Override
+  public void setSmallKeysDirMap(Map<Byte, Map<Byte, String>> smallKeysDirMap) {
+    this.smallKeysDirMap = smallKeysDirMap;
+  }
+
+  @Override
+  public Map<Byte, TableDesc> getSkewKeysValuesTables() {
+    return skewKeysValuesTables;
+  }
+
+  @Override
+  public void setSkewKeysValuesTables(Map<Byte, TableDesc> skewKeysValuesTables) {
+    this.skewKeysValuesTables = skewKeysValuesTables;
+  }
+
+  @Override
+  public Map<Byte, List<ExprNodeDesc>> getExprs() {
+    return exprs;
+  }
+
+  @Override
+  public void setExprs(Map<Byte, List<ExprNodeDesc>> exprs) {
+    this.exprs = exprs;
+  }
+
+  @Override
+  public Map<Byte, List<ExprNodeDesc>> getFilters() {
+    return filters;
+  }
+
+  @Override
+  public void setFilters(Map<Byte, List<ExprNodeDesc>> filters) {
+    this.filters = filters;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return outputColumnNames;
+  }
+
+  @Override
+  public void setOutputColumnNames(List<String> outputColumnNames) {
+    this.outputColumnNames = outputColumnNames;
+  }
+
+  @Override
+  public Map<String, Byte> getReversedExprs() {
+    return reversedExprs;
+  }
+
+  @Override
+  public void setReversedExprs(Map<String, Byte> reversedExprs) {
+    this.reversedExprs = reversedExprs;
+  }
+
+  @Override
+  public boolean isNoOuterJoin() {
+    return noOuterJoin;
+  }
+
+  @Override
+  public void setNoOuterJoin(boolean noOuterJoin) {
+    this.noOuterJoin = noOuterJoin;
+  }
+
+  @Override
+  public JoinCondDesc[] getConds() {
+    return conds;
+  }
+
+  @Override
+  public void setConds(JoinCondDesc[] conds) {
+    this.conds = conds;
+  }
+
+  @Override
+  public Byte[] getTagOrder() {
+    return tagOrder;
+  }
+
+  @Override
+  public void setTagOrder(Byte[] tagOrder) {
+    this.tagOrder = tagOrder;
+  }
+
+  @Override
+  public TableDesc getKeyTableDesc() {
+    return keyTableDesc;
+  }
+
+  @Override
+  public void setKeyTableDesc(TableDesc keyTableDesc) {
+    this.keyTableDesc = keyTableDesc;
+  }
+
+
+  public Map<Byte, List<Integer>> getRetainList() {
+    return retainList;
+  }
+
+  public void setRetainList(Map<Byte, List<Integer>> retainList) {
+    this.retainList = retainList;
+  }
+
+  /**
+   * @return the keys
+   */
+  @Explain(displayName = "keys")
+  public Map<Byte, List<ExprNodeDesc>> getKeys() {
+    return keys;
+  }
+
+  /**
+   * @param keys
+   *          the keys to set
+   */
+  public void setKeys(Map<Byte, List<ExprNodeDesc>> keys) {
+    this.keys = keys;
+  }
+
+  /**
+   * @return the position of the big table not in memory
+   */
+  @Explain(displayName = "Position of Big Table")
+  public int getPosBigTable() {
+    return posBigTable;
+  }
+
+  /**
+   * @param posBigTable
+   *          the position of the big table not in memory
+   */
+  public void setPosBigTable(int posBigTable) {
+    this.posBigTable = posBigTable;
+  }
+
+  /**
+   * @return the keyTblDesc
+   */
+  public TableDesc getKeyTblDesc() {
+    return keyTblDesc;
+  }
+
+  /**
+   * @param keyTblDesc
+   *          the keyTblDesc to set
+   */
+  public void setKeyTblDesc(TableDesc keyTblDesc) {
+    this.keyTblDesc = keyTblDesc;
+  }
+
+  /**
+   * @return the valueTblDescs
+   */
+  public List<TableDesc> getValueTblDescs() {
+    return valueTblDescs;
+  }
+
+  /**
+   * @param valueTblDescs
+   *          the valueTblDescs to set
+   */
+  public void setValueTblDescs(List<TableDesc> valueTblDescs) {
+    this.valueTblDescs = valueTblDescs;
+  }
+
+  /**
+   * @return bigTableAlias
+   */
+  public String getBigTableAlias() {
+    return bigTableAlias;
+  }
+
+  /**
+   * @param bigTableAlias
+   */
+  public void setBigTableAlias(String bigTableAlias) {
+    this.bigTableAlias = bigTableAlias;
+  }
+
+  public LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> getAliasBucketFileNameMapping() {
+    return aliasBucketFileNameMapping;
+  }
+
+  public void setAliasBucketFileNameMapping(
+      LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping) {
+    this.aliasBucketFileNameMapping = aliasBucketFileNameMapping;
+  }
+
+  public LinkedHashMap<String, Integer> getBucketFileNameMapping() {
+    return bucketFileNameMapping;
+  }
+
+  public void setBucketFileNameMapping(LinkedHashMap<String, Integer> bucketFileNameMapping) {
+    this.bucketFileNameMapping = bucketFileNameMapping;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMDummyDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMDummyDesc.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMDummyDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMDummyDesc.java Fri Nov 12 06:12:44 2010
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.plan;
-
-import java.io.Serializable;
-/**
- * JDBM Dummy Descriptor implementation.
- *
- */
-@Explain(displayName = "JDBMDummy Operator")
-public class JDBMDummyDesc implements Serializable {
-  private TableDesc tbl;
-
-  public TableDesc getTbl() {
-    return tbl;
-  }
-
-  public void setTbl(TableDesc tbl) {
-    this.tbl = tbl;
-  }
-
-}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMSinkDesc.java?rev=1034276&r1=1034275&r2=1034276&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JDBMSinkDesc.java Fri Nov 12 06:12:44 2010
@@ -1,355 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.plan;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Map.Entry;
-
-/**
- * Map Join operator Descriptor implementation.
- *
- */
-@Explain(displayName = "JDBM Sink Operator")
-public class JDBMSinkDesc extends JoinDesc implements Serializable {
-  private static final long serialVersionUID = 1L;
-
-
-  // used to handle skew join
-  private boolean handleSkewJoin = false;
-  private int skewKeyDefinition = -1;
-  private Map<Byte, String> bigKeysDirMap;
-  private Map<Byte, Map<Byte, String>> smallKeysDirMap;
-  private Map<Byte, TableDesc> skewKeysValuesTables;
-
-  // alias to key mapping
-  private Map<Byte, List<ExprNodeDesc>> exprs;
-
-  // alias to filter mapping
-  private Map<Byte, List<ExprNodeDesc>> filters;
-
-  // used for create joinOutputObjectInspector
-  protected List<String> outputColumnNames;
-
-  // key:column output name, value:tag
-  private transient Map<String, Byte> reversedExprs;
-
-  // No outer join involved
-  protected boolean noOuterJoin;
-
-  protected JoinCondDesc[] conds;
-
-  protected Byte[] tagOrder;
-  private TableDesc keyTableDesc;
-
-
-  private Map<Byte, List<ExprNodeDesc>> keys;
-  private TableDesc keyTblDesc;
-  private List<TableDesc> valueTblDescs;
-
-  private int posBigTable;
-
-  private Map<Byte, List<Integer>> retainList;
-
-  private transient String bigTableAlias;
-
-  private LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping;
-  private LinkedHashMap<String, Integer> bucketFileNameMapping;
-
-  public JDBMSinkDesc() {
-    bucketFileNameMapping = new LinkedHashMap<String, Integer>();
-  }
-
-  public JDBMSinkDesc(MapJoinDesc clone) {
-    this.bigKeysDirMap = clone.getBigKeysDirMap();
-    this.conds = clone.getConds();
-    this.exprs= clone.getExprs();
-    this.handleSkewJoin = clone.getHandleSkewJoin();
-    this.keyTableDesc = clone.getKeyTableDesc();
-    this.noOuterJoin = clone.getNoOuterJoin();
-    this.outputColumnNames = clone.getOutputColumnNames();
-    this.reversedExprs = clone.getReversedExprs();
-    this.skewKeyDefinition = clone.getSkewKeyDefinition();
-    this.skewKeysValuesTables = clone.getSkewKeysValuesTables();
-    this.smallKeysDirMap = clone.getSmallKeysDirMap();
-    this.tagOrder = clone.getTagOrder();
-    this.filters = clone.getFilters();
-
-    this.keys = clone.getKeys();
-    this.keyTblDesc = clone.getKeyTblDesc();
-    this.valueTblDescs = clone.getValueTblDescs();
-    this.posBigTable = clone.getPosBigTable();
-    this.retainList = clone.getRetainList();
-    this.bigTableAlias = clone.getBigTableAlias();
-    this.aliasBucketFileNameMapping = clone.getAliasBucketFileNameMapping();
-    this.bucketFileNameMapping = clone.getBucketFileNameMapping();
-  }
-
-
-  private void initRetainExprList() {
-    retainList = new HashMap<Byte, List<Integer>>();
-    Set<Entry<Byte, List<ExprNodeDesc>>> set = exprs.entrySet();
-    Iterator<Entry<Byte, List<ExprNodeDesc>>> setIter = set.iterator();
-    while (setIter.hasNext()) {
-      Entry<Byte, List<ExprNodeDesc>> current = setIter.next();
-      List<Integer> list = new ArrayList<Integer>();
-      for (int i = 0; i < current.getValue().size(); i++) {
-        list.add(i);
-      }
-      retainList.put(current.getKey(), list);
-    }
-  }
-
-  public boolean isHandleSkewJoin() {
-    return handleSkewJoin;
-  }
-
-  @Override
-  public void setHandleSkewJoin(boolean handleSkewJoin) {
-    this.handleSkewJoin = handleSkewJoin;
-  }
-
-  @Override
-  public int getSkewKeyDefinition() {
-    return skewKeyDefinition;
-  }
-
-  @Override
-  public void setSkewKeyDefinition(int skewKeyDefinition) {
-    this.skewKeyDefinition = skewKeyDefinition;
-  }
-
-  @Override
-  public Map<Byte, String> getBigKeysDirMap() {
-    return bigKeysDirMap;
-  }
-
-  @Override
-  public void setBigKeysDirMap(Map<Byte, String> bigKeysDirMap) {
-    this.bigKeysDirMap = bigKeysDirMap;
-  }
-
-  @Override
-  public Map<Byte, Map<Byte, String>> getSmallKeysDirMap() {
-    return smallKeysDirMap;
-  }
-
-  @Override
-  public void setSmallKeysDirMap(Map<Byte, Map<Byte, String>> smallKeysDirMap) {
-    this.smallKeysDirMap = smallKeysDirMap;
-  }
-
-  @Override
-  public Map<Byte, TableDesc> getSkewKeysValuesTables() {
-    return skewKeysValuesTables;
-  }
-
-  @Override
-  public void setSkewKeysValuesTables(Map<Byte, TableDesc> skewKeysValuesTables) {
-    this.skewKeysValuesTables = skewKeysValuesTables;
-  }
-
-  @Override
-  public Map<Byte, List<ExprNodeDesc>> getExprs() {
-    return exprs;
-  }
-
-  @Override
-  public void setExprs(Map<Byte, List<ExprNodeDesc>> exprs) {
-    this.exprs = exprs;
-  }
-
-  @Override
-  public Map<Byte, List<ExprNodeDesc>> getFilters() {
-    return filters;
-  }
-
-  @Override
-  public void setFilters(Map<Byte, List<ExprNodeDesc>> filters) {
-    this.filters = filters;
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return outputColumnNames;
-  }
-
-  @Override
-  public void setOutputColumnNames(List<String> outputColumnNames) {
-    this.outputColumnNames = outputColumnNames;
-  }
-
-  @Override
-  public Map<String, Byte> getReversedExprs() {
-    return reversedExprs;
-  }
-
-  @Override
-  public void setReversedExprs(Map<String, Byte> reversedExprs) {
-    this.reversedExprs = reversedExprs;
-  }
-
-  @Override
-  public boolean isNoOuterJoin() {
-    return noOuterJoin;
-  }
-
-  @Override
-  public void setNoOuterJoin(boolean noOuterJoin) {
-    this.noOuterJoin = noOuterJoin;
-  }
-
-  @Override
-  public JoinCondDesc[] getConds() {
-    return conds;
-  }
-
-  @Override
-  public void setConds(JoinCondDesc[] conds) {
-    this.conds = conds;
-  }
-
-  @Override
-  public Byte[] getTagOrder() {
-    return tagOrder;
-  }
-
-  @Override
-  public void setTagOrder(Byte[] tagOrder) {
-    this.tagOrder = tagOrder;
-  }
-
-  @Override
-  public TableDesc getKeyTableDesc() {
-    return keyTableDesc;
-  }
-
-  @Override
-  public void setKeyTableDesc(TableDesc keyTableDesc) {
-    this.keyTableDesc = keyTableDesc;
-  }
-
-
-  public Map<Byte, List<Integer>> getRetainList() {
-    return retainList;
-  }
-
-  public void setRetainList(Map<Byte, List<Integer>> retainList) {
-    this.retainList = retainList;
-  }
-
-  /**
-   * @return the keys
-   */
-  @Explain(displayName = "keys")
-  public Map<Byte, List<ExprNodeDesc>> getKeys() {
-    return keys;
-  }
-
-  /**
-   * @param keys
-   *          the keys to set
-   */
-  public void setKeys(Map<Byte, List<ExprNodeDesc>> keys) {
-    this.keys = keys;
-  }
-
-  /**
-   * @return the position of the big table not in memory
-   */
-  @Explain(displayName = "Position of Big Table")
-  public int getPosBigTable() {
-    return posBigTable;
-  }
-
-  /**
-   * @param posBigTable
-   *          the position of the big table not in memory
-   */
-  public void setPosBigTable(int posBigTable) {
-    this.posBigTable = posBigTable;
-  }
-
-  /**
-   * @return the keyTblDesc
-   */
-  public TableDesc getKeyTblDesc() {
-    return keyTblDesc;
-  }
-
-  /**
-   * @param keyTblDesc
-   *          the keyTblDesc to set
-   */
-  public void setKeyTblDesc(TableDesc keyTblDesc) {
-    this.keyTblDesc = keyTblDesc;
-  }
-
-  /**
-   * @return the valueTblDescs
-   */
-  public List<TableDesc> getValueTblDescs() {
-    return valueTblDescs;
-  }
-
-  /**
-   * @param valueTblDescs
-   *          the valueTblDescs to set
-   */
-  public void setValueTblDescs(List<TableDesc> valueTblDescs) {
-    this.valueTblDescs = valueTblDescs;
-  }
-
-  /**
-   * @return bigTableAlias
-   */
-  public String getBigTableAlias() {
-    return bigTableAlias;
-  }
-
-  /**
-   * @param bigTableAlias
-   */
-  public void setBigTableAlias(String bigTableAlias) {
-    this.bigTableAlias = bigTableAlias;
-  }
-
-  public LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> getAliasBucketFileNameMapping() {
-    return aliasBucketFileNameMapping;
-  }
-
-  public void setAliasBucketFileNameMapping(
-      LinkedHashMap<String, LinkedHashMap<String, ArrayList<String>>> aliasBucketFileNameMapping) {
-    this.aliasBucketFileNameMapping = aliasBucketFileNameMapping;
-  }
-
-  public LinkedHashMap<String, Integer> getBucketFileNameMapping() {
-    return bucketFileNameMapping;
-  }
-
-  public void setBucketFileNameMapping(LinkedHashMap<String, Integer> bucketFileNameMapping) {
-    this.bucketFileNameMapping = bucketFileNameMapping;
-  }
-}



Mime
View raw message