hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prasan...@apache.org
Subject [12/52] [abbrv] hive git commit: HIVE-10456: Grace Hash Join should not load spilled partitions on abort (Prasanth Jayachandran reviewed by Gunther Hagleitner)
Date Thu, 07 May 2015 01:21:06 GMT
HIVE-10456: Grace Hash Join should not load spilled partitions on abort (Prasanth Jayachandran
reviewed by Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/07fcb098
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/07fcb098
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/07fcb098

Branch: refs/heads/llap
Commit: 07fcb098b63003cf74718351269c79870100b8de
Parents: 77b7fc3
Author: Prasanth Jayachandran <j.prasanth.j@gmail.com>
Authored: Sat May 2 17:40:01 2015 -0700
Committer: Prasanth Jayachandran <j.prasanth.j@gmail.com>
Committed: Sat May 2 17:40:01 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/MapJoinOperator.java    | 136 +++++++++++--------
 .../apache/hadoop/hive/ql/exec/ObjectCache.java |   7 +
 .../hadoop/hive/ql/exec/mr/ObjectCache.java     |   5 +
 .../persistence/HybridHashTableContainer.java   |  35 ++++-
 .../hive/ql/exec/tez/HashTableLoader.java       |   5 -
 .../hadoop/hive/ql/exec/tez/ObjectCache.java    |   6 +
 .../mapjoin/VectorMapJoinRowBytesContainer.java |   2 +-
 7 files changed, 131 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index f2b800a..1cfc411 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -284,7 +284,17 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc>
implem
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
     loader.init(mapContext, mrContext, hconf, this);
-    loader.load(mapJoinTables, mapJoinTableSerdes);
+    try {
+      loader.load(mapJoinTables, mapJoinTableSerdes);
+    } catch (HiveException e) {
+      if (isLogInfoEnabled) {
+        LOG.info("Exception loading hash tables. Clearing partially loaded hash table containers.");
+      }
+
+      // there could be some spilled partitions which needs to be cleaned up
+      clearAllTableContainers();
+      throw e;
+    }
 
     hashTblInitedOnce = true;
 
@@ -433,7 +443,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc>
implem
   @Override
   public void closeOp(boolean abort) throws HiveException {
     boolean spilled = false;
-    for (MapJoinTableContainer container: mapJoinTables) {
+    for (MapJoinTableContainer container : mapJoinTables) {
       if (container != null) {
         spilled = spilled || container.hasSpill();
         container.dumpMetrics();
@@ -442,79 +452,93 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc>
implem
 
     // For Hybrid Grace Hash Join, we need to see if there is any spilled data to be processed
next
     if (spilled) {
-      if (hashMapRowGetters == null) {
-        hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
-      }
-      int numPartitions = 0;
-      // Find out number of partitions for each small table (should be same across tables)
-      for (byte pos = 0; pos < mapJoinTables.length; pos++) {
-        if (pos != conf.getPosBigTable()) {
-          firstSmallTable = (HybridHashTableContainer)mapJoinTables[pos];
-          numPartitions = firstSmallTable.getHashPartitions().length;
-          break;
+      if (!abort) {
+        if (hashMapRowGetters == null) {
+          hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length];
         }
-      }
-      assert numPartitions != 0 : "Number of partitions must be greater than 0!";
-
-      if (firstSmallTable.hasSpill()) {
-        spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length];
-        hybridMapJoinLeftover = true;
-
-        // Clear all in-memory partitions first
+        int numPartitions = 0;
+        // Find out number of partitions for each small table (should be same across tables)
         for (byte pos = 0; pos < mapJoinTables.length; pos++) {
-          MapJoinTableContainer tableContainer = mapJoinTables[pos];
-          if (tableContainer != null && tableContainer instanceof HybridHashTableContainer)
{
-            HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer;
-            hybridHtContainer.dumpStats();
-
-            HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions();
-            // Clear all in memory partitions first
-            for (int i = 0; i < hashPartitions.length; i++) {
-              if (!hashPartitions[i].isHashMapOnDisk()) {
-                hybridHtContainer.setTotalInMemRowCount(
-                    hybridHtContainer.getTotalInMemRowCount() -
-                        hashPartitions[i].getHashMapFromMemory().getNumValues());
-                hashPartitions[i].getHashMapFromMemory().clear();
+          if (pos != conf.getPosBigTable()) {
+            firstSmallTable = (HybridHashTableContainer) mapJoinTables[pos];
+            numPartitions = firstSmallTable.getHashPartitions().length;
+            break;
+          }
+        }
+        assert numPartitions != 0 : "Number of partitions must be greater than 0!";
+
+        if (firstSmallTable.hasSpill()) {
+          spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length];
+          hybridMapJoinLeftover = true;
+
+          // Clear all in-memory partitions first
+          for (byte pos = 0; pos < mapJoinTables.length; pos++) {
+            MapJoinTableContainer tableContainer = mapJoinTables[pos];
+            if (tableContainer != null && tableContainer instanceof HybridHashTableContainer)
{
+              HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer;
+              hybridHtContainer.dumpStats();
+
+              HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions();
+              // Clear all in memory partitions first
+              for (int i = 0; i < hashPartitions.length; i++) {
+                if (!hashPartitions[i].isHashMapOnDisk()) {
+                  hybridHtContainer.setTotalInMemRowCount(
+                      hybridHtContainer.getTotalInMemRowCount() -
+                          hashPartitions[i].getHashMapFromMemory().getNumValues());
+                  hashPartitions[i].getHashMapFromMemory().clear();
+                }
               }
+              assert hybridHtContainer.getTotalInMemRowCount() == 0;
             }
-            assert hybridHtContainer.getTotalInMemRowCount() == 0;
           }
-        }
 
-        // Reprocess the spilled data
-        for (int i = 0; i < numPartitions; i++) {
-          HashPartition[] hashPartitions = firstSmallTable.getHashPartitions();
-          if (hashPartitions[i].isHashMapOnDisk()) {
-            try {
-              continueProcess(i);     // Re-process spilled data
-            } catch (IOException e) {
-              e.printStackTrace();
-            } catch (SerDeException e) {
-              e.printStackTrace();
-            } catch (ClassNotFoundException e) {
-              e.printStackTrace();
-            }
-            for (byte pos = 0; pos < order.length; pos++) {
-              if (pos != conf.getPosBigTable())
-                spilledMapJoinTables[pos] = null;
+          // Reprocess the spilled data
+          for (int i = 0; i < numPartitions; i++) {
+            HashPartition[] hashPartitions = firstSmallTable.getHashPartitions();
+            if (hashPartitions[i].isHashMapOnDisk()) {
+              try {
+                continueProcess(i);     // Re-process spilled data
+              } catch (Exception e) {
+                throw new HiveException(e);
+              }
+              for (byte pos = 0; pos < order.length; pos++) {
+                if (pos != conf.getPosBigTable())
+                  spilledMapJoinTables[pos] = null;
+              }
             }
           }
         }
       }
+
+      if (isLogInfoEnabled) {
+        LOG.info("spilled: " + spilled + " abort: " + abort + ". Clearing spilled partitions.");
+      }
+
+      // spilled tables are loaded always (no sharing), so clear it
+      clearAllTableContainers();
+      cache.remove(cacheKey);
     }
 
+    // in mapreduce case, we need to always clear up as mapreduce doesn't have object registry.
     if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork()
!= null)
-        && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())
-        && mapJoinTables != null) {
+        && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()))
{
+      if (isLogInfoEnabled) {
+        LOG.info("MR: Clearing all map join table containers.");
+      }
+      clearAllTableContainers();
+    }
+
+    super.closeOp(abort);
+  }
+
+  private void clearAllTableContainers() {
+    if (mapJoinTables != null) {
       for (MapJoinTableContainer tableContainer : mapJoinTables) {
         if (tableContainer != null) {
           tableContainer.clear();
         }
       }
     }
-    cache.release(cacheKey);
-    this.loader = null;
-    super.closeOp(abort);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
index f0df2d3..440e0a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java
@@ -53,4 +53,11 @@ public interface ObjectCache {
    * @return the last cached object with the key, null if none.
    */
   public <T> Future<T> retrieveAsync(String key, Callable<T> fn) throws
HiveException;
+
+  /**
+   * Removes the specified key from the object cache.
+   *
+   * @param key - key to be removed
+   */
+  public void remove(String key);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
index a6f698d..bf4ae8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java
@@ -91,4 +91,9 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache
{
       }
     };
   }
+
+  @Override
+  public void remove(String key) {
+    // nothing to do
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index 3f6d61e..412226e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -183,6 +183,36 @@ public class HybridHashTableContainer
     public boolean isHashMapOnDisk() {
       return hashMapOnDisk;
     }
+
+    public void clear() {
+      if (hashMap != null) {
+        hashMap.clear();
+        hashMap = null;
+      }
+
+      if (hashMapLocalPath != null) {
+        try {
+          Files.delete(hashMapLocalPath);
+        } catch (Throwable ignored) {
+        }
+        hashMapLocalPath = null;
+      }
+
+      if (sidefileKVContainer != null) {
+        sidefileKVContainer.clear();
+        sidefileKVContainer = null;
+      }
+
+      if (matchfileObjContainer != null) {
+        matchfileObjContainer.clear();
+        matchfileObjContainer = null;
+      }
+
+      if (matchfileRowBytesContainer != null) {
+        matchfileRowBytesContainer.clear();
+        matchfileRowBytesContainer = null;
+      }
+    }
   }
 
   public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable,
@@ -546,12 +576,11 @@ public class HybridHashTableContainer
     return toSpillPartitionId;
   }
 
-  /* Clean up in memory hashtables */
   @Override
   public void clear() {
     for (HashPartition hp : hashPartitions) {
-      if (hp.hashMap != null) {
-        hp.hashMap.clear();
+      if (hp != null) {
+        hp.clear();
       }
     }
     memoryUsed = 0;

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 6a81f11..536b92c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
@@ -181,10 +180,6 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
         }
         tableContainer.seal();
         mapJoinTables[pos] = tableContainer;
-      } catch (IOException e) {
-        throw new HiveException(e);
-      } catch (SerDeException e) {
-        throw new HiveException(e);
       } catch (Exception e) {
         throw new HiveException(e);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
index c0bcb21..64295d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java
@@ -93,4 +93,10 @@ public class ObjectCache implements org.apache.hadoop.hive.ql.exec.ObjectCache
{
       }
     });
   }
+
+  @Override
+  public void remove(String key) {
+    LOG.info("Removing key: " + key);
+    registry.delete(key);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/07fcb098/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
index c8359d3..1c91be6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java
@@ -290,7 +290,7 @@ public class VectorMapJoinRowBytesContainer {
     return currentLength;
   }
 
-  public void clear() throws IOException {
+  public void clear() {
     if (fileInputStream != null) {
       try {
         fileInputStream.close();


Mime
View raw message