hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1637860 - /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
Date Mon, 10 Nov 2014 13:53:07 GMT
Author: xuefu
Date: Mon Nov 10 13:53:06 2014
New Revision: 1637860

URL: http://svn.apache.org/r1637860
Log:
HIVE-8621: Dump small table join data for map-join [Spark Branch] (Jimmy via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1637860&r1=1637859&r2=1637860&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
Mon Nov 10 13:53:06 2014
@@ -31,12 +31,13 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKeyObject;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinPersistableTableContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinEagerRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
@@ -49,11 +50,10 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.util.ReflectionUtils;
 
+@SuppressWarnings({"rawtypes", "deprecation"})
 public class HashTableSinkOperator extends TerminalOperator<HashTableSinkDesc> implements
     Serializable {
   private static final long serialVersionUID = 1L;
@@ -93,6 +93,10 @@ public class HashTableSinkOperator exten
   private transient Byte[] order; // order in which the results should
   private Configuration hconf;
 
+  // Used as a differentiator for different files
+  // in case multiple files are created for one operator.
+  private int fileIndex = 0;
+
   private transient MapJoinPersistableTableContainer[] mapJoinTables;
   private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
 
@@ -197,25 +201,24 @@ public class HashTableSinkOperator exten
     return mapJoinTables;
   }
 
-  private static List<ObjectInspector>[] getStandardObjectInspectors(
-      List<ObjectInspector>[] aliasToObjectInspectors, int maxTag) {
-    @SuppressWarnings("unchecked")
-    List<ObjectInspector>[] result = new List[maxTag];
-    for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
-      List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
-      if (oiList == null) {
-        continue;
-      }
-      ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(oiList.size());
-      for (int i = 0; i < oiList.size(); i++) {
-        fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i),
-            ObjectInspectorCopyOption.WRITABLE));
-      }
-      result[alias] = fieldOIList;
-    }
-    return result;
-
-  }
+//  private static List<ObjectInspector>[] getStandardObjectInspectors(
+//      List<ObjectInspector>[] aliasToObjectInspectors, int maxTag) {
+//    @SuppressWarnings("unchecked")
+//    List<ObjectInspector>[] result = new List[maxTag];
+//    for (byte alias = 0; alias < aliasToObjectInspectors.length; alias++) {
+//      List<ObjectInspector> oiList = aliasToObjectInspectors[alias];
+//      if (oiList == null) {
+//        continue;
+//      }
+//      ArrayList<ObjectInspector> fieldOIList = new ArrayList<ObjectInspector>(oiList.size());
+//      for (int i = 0; i < oiList.size(); i++) {
+//        fieldOIList.add(ObjectInspectorUtils.getStandardObjectInspector(oiList.get(i),
+//            ObjectInspectorCopyOption.WRITABLE));
+//      }
+//      result[alias] = fieldOIList;
+//    }
+//    return result;
+//  }
 
   /*
    * This operator only process small tables Read the key/value pairs Load them into hashtable
@@ -295,11 +298,22 @@ public class HashTableSinkOperator exten
       // get the tmp URI path; it will be a hdfs path if not local mode
       String dumpFilePrefix = conf.getDumpFilePrefix();
       Path path = Utilities.generatePath(tmpURI, dumpFilePrefix, tag, fileName);
+      FileSystem fs = path.getFileSystem(hconf);
+      short replication = fs.getDefaultReplication(path);
+
+      // For Spark, path is a folder. Let's create it now.
+      if (HiveConf.getVar(hconf, ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+        fs.mkdirs(path);  // Create the folder and its parents if not there
+        path = new Path(path, getOperatorId() + "-" + (fileIndex++));
+        // TODO find out numOfPartitions for the big table
+        int numOfPartitions = 10;
+        replication = (short)Math.min(10, numOfPartitions);
+      }
       console.printInfo(Utilities.now() + "\tDump the side-table for tag: " + tag +
           " with group count: " + tableContainer.size() + " into file: " + path);
       // get the hashtable file and path
-      FileSystem fs = path.getFileSystem(hconf);
-      ObjectOutputStream out = new ObjectOutputStream(new BufferedOutputStream(fs.create(path),
4096));
+      ObjectOutputStream out = new ObjectOutputStream(
+        new BufferedOutputStream(fs.create(path, replication), 4096));
       try {
         mapJoinTableSerdes[tag].persist(out, tableContainer);
       } finally {



Mime
View raw message