hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhr...@apache.org
Subject svn commit: r712905 [2/38] - in /hadoop/core/trunk: ./ src/contrib/hive/ src/contrib/hive/cli/src/java/org/apache/hadoop/hive/cli/ src/contrib/hive/common/src/java/org/apache/hadoop/hive/conf/ src/contrib/hive/conf/ src/contrib/hive/data/files/ src/con...
Date Tue, 11 Nov 2008 01:50:18 GMT
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Mon Nov 10 17:50:06 2008
@@ -74,6 +74,7 @@
 
   transient HiveConf conf;
   static final private int separator  = Utilities.tabCode;
+  static final private int singleQuote  = '\'';
   static final private int terminator = Utilities.newLineCode;
   
   public void initialize(HiveConf conf) {
@@ -95,7 +96,6 @@
 
         // create the table
         Table tbl = new Table(crtTbl.getTableName());
-        tbl.setFields(crtTbl.getCols());
         StorageDescriptor tblStorDesc = tbl.getTTable().getSd();
         if (crtTbl.getBucketCols() != null)
           tblStorDesc.setBucketCols(crtTbl.getBucketCols());
@@ -169,7 +169,7 @@
           List<String> bucketCols = tbl.getBucketCols();
           List<Order> sortCols = tbl.getSortCols();
 
-          if (sortCols.size() >= bucketCols.size())
+          if ( (sortCols.size() > 0) && (sortCols.size() >= bucketCols.size()))
           {
             boolean found = true;
 
@@ -201,6 +201,10 @@
         // set create time
         tbl.getTTable().setCreateTime((int) (System.currentTimeMillis()/1000));
 
+        if(crtTbl.getCols() != null) {
+          tbl.setFields(crtTbl.getCols());
+        }
+
         // create the table
         db.createTable(tbl);
         return 0;
@@ -280,6 +284,20 @@
           }
           tbl.getTTable().getSd().setCols(alterTbl.getNewCols());
         }
+        else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDPROPS) {
+          tbl.getTTable().getParameters().putAll(alterTbl.getProps());
+        }
+        else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDSERDEPROPS) {
+          tbl.getTTable().getSd().getSerdeInfo().getParameters().putAll(alterTbl.getProps());
+        }
+        else if (alterTbl.getOp() == alterTableDesc.alterTableTypes.ADDSERDE) {
+          tbl.setSerializationLib(alterTbl.getSerdeName());
+          if ((alterTbl.getProps() != null) && (alterTbl.getProps().size() > 0))
+            tbl.getTTable().getSd().getSerdeInfo().getParameters().putAll(alterTbl.getProps());
+          // since serde is modified then do the appropriate things to reset columns etc
+          tbl.reinitSerDe();
+          tbl.setFields(Hive.getFieldsFromDeserializer(tbl.getName(), tbl.getDeserializer()));
+        }
         else {
           console.printError("Unsupported Alter commnad");
           return 1;
@@ -357,7 +375,9 @@
             if (col.getComment() != null)
             {
               os.write(separator);
+              os.write(singleQuote);
               os.write(col.getComment().getBytes("UTF-8"));
+              os.write(singleQuote);
             }
             firstCol = false;
           }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Mon Nov 10 17:50:06 2008
@@ -34,11 +34,13 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.io.*;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.ql.session.SessionState;
 
 public class ExecDriver extends Task<mapredWork> implements Serializable {
 
@@ -54,12 +56,37 @@
     super();
   }
 
+  public static String getRealFiles(Configuration conf) {
+    // fill in local files to be added to the task environment
+    SessionState ss = SessionState.get();
+    Set<String> files = (ss == null) ? null : ss.list_resource(SessionState.ResourceType.FILE, null);
+    if(files != null) {
+      ArrayList<String> realFiles = new ArrayList<String> (files.size());
+      for(String one: files) {
+        try {
+          realFiles.add(Utilities.realFile(one, conf));
+        } catch (IOException e) {
+          throw new RuntimeException ("Cannot validate file " + one +
+                                      "due to exception: " + e.getMessage(), e);
+        }
+      }
+      return StringUtils.join(realFiles, ",");
+    } else {
+      return "";
+    }
+  }
+
+
   /**
    * Initialization when invoked from QL
    */
   public void initialize (HiveConf conf) {
     super.initialize(conf);
     job = new JobConf(conf, ExecDriver.class);
+    String realFiles = getRealFiles(job);
+    if (realFiles != null && realFiles.length() > 0) {
+      job.set("tmpfiles", realFiles);
+    }
   }
 
   /**
@@ -121,8 +148,7 @@
               }
             }
           }
-        }
-                                           );
+        });
     }
   }
 
@@ -207,6 +233,7 @@
 
     Utilities.setMapRedWork(job, work);
     
+    
     for(String onefile: work.getPathToAliases().keySet()) {
       LOG.info("Adding input file " + onefile);
       FileInputFormat.addInputPaths(job, onefile);
@@ -217,8 +244,8 @@
     FileOutputFormat.setOutputPath(job, new Path(jobScratchDir));
     job.setMapperClass(ExecMapper.class);
     
-    job.setMapOutputValueClass(Text.class);
     job.setMapOutputKeyClass(HiveKey.class);    
+    job.setMapOutputValueClass(BytesWritable.class);
     
     job.setNumReduceTasks(work.getNumReduceTasks().intValue());
     job.setReducerClass(ExecReducer.class);
@@ -265,6 +292,10 @@
       
       inferNumReducers();
       JobClient jc = new JobClient(job);
+      
+      // make this client wait if job trcker is not behaving well.
+      Throttle.checkJobTracker(job, LOG);
+
       rj = jc.submitJob(job);
 
       // add to list of running jobs so in case of abnormal shutdown can kill it.
@@ -306,7 +337,8 @@
   }
   
   private static void printUsage() {
-    System.out.println("ExecDriver -plan <plan-file> [-jobconf k1=v1 [-jobconf k2=v2] ...]");
+    System.out.println("ExecDriver -plan <plan-file> [-jobconf k1=v1 [-jobconf k2=v2] ...] "+
+                       "[-files <file1>[,<file2>] ...]");
     System.exit(1);
   }
 
@@ -314,15 +346,19 @@
     String planFileName = null;
     ArrayList<String> jobConfArgs = new ArrayList<String> ();
     boolean isSilent = false;
+    String files = null;
 
     try{
       for(int i=0; i<args.length; i++) {
         if(args[i].equals("-plan")) {
           planFileName = args[++i];
+          System.out.println("plan = "+planFileName);
         } else if (args[i].equals("-jobconf")) {
           jobConfArgs.add(args[++i]);
         } else if (args[i].equals("-silent")) {
           isSilent = true;
+        } else if (args[i].equals("-files")) {
+          files = args[++i];
         }
       }
     } catch (IndexOutOfBoundsException e) {
@@ -350,6 +386,10 @@
       }
     }
 
+    if(files != null) {
+      conf.set("tmpfiles", files);
+    }
+
     URI pathURI = (new Path(planFileName)).toUri();
     InputStream pathData;
     if(StringUtils.isEmpty(pathURI.getScheme())) {

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Mon Nov 10 17:50:06 2008
@@ -27,20 +27,17 @@
 import org.apache.commons.logging.LogFactory;
 
 
-import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.exec.ExecMapper.reportStats;
-import org.apache.hadoop.hive.serde2.ColumnSet;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.MetadataListStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 
 public class ExecReducer extends MapReduceBase implements Reducer {
 
@@ -74,15 +71,23 @@
     reducer.setMapredWork(gWork);
     isTagged = gWork.getNeedsTagging();
     try {
-      // We should initialize the SerDe with the TypeInfo when available.
-      tableDesc keyTableDesc = PlanUtils.getReduceKeyDesc(gWork);
+      tableDesc keyTableDesc = gWork.getKeyDesc();
       inputKeyDeserializer = (SerDe)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
       inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
-      for(int tag=0; tag<Byte.MAX_VALUE; tag++) {
+      keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+      for(int tag=0; tag<gWork.getTagToValueDesc().size(); tag++) {
         // We should initialize the SerDe with the TypeInfo when available.
-        tableDesc valueTableDesc = PlanUtils.getReduceValueDesc(gWork, tag);
+        tableDesc valueTableDesc = gWork.getTagToValueDesc().get(tag);
         inputValueDeserializer[tag] = (SerDe)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
         inputValueDeserializer[tag].initialize(null, valueTableDesc.getProperties());
+        valueObjectInspector[tag] = inputValueDeserializer[tag].getObjectInspector();
+        
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector[tag]);
+        ois.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(Byte.class));
+        rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
+            Arrays.asList(fieldNames), ois);
       }
     } catch (SerDeException e) {
       throw new RuntimeException(e);
@@ -143,18 +148,12 @@
       } catch (SerDeException e) {
         throw new HiveException(e);
       }
-      // This is a hack for generating the correct ObjectInspector.
-      // In the future, we should use DynamicSerde and initialize it using the type info. 
-      if (keyObjectInspector == null) {
-        // Directly create ObjectInspector here because we didn't know the number of cols till now.
-        keyObjectInspector = MetadataListStructObjectInspector.getInstance(((ColumnSet)keyObject).col.size()); 
-      }
       // System.err.print(keyObject.toString());
       while (values.hasNext()) {
-        Text valueText = (Text)values.next();
+        Writable valueWritable = (Writable) values.next();
         //System.err.print(who.getHo().toString());
         try {
-          valueObject[tag] = inputValueDeserializer[tag].deserialize(valueText);
+          valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable);
         } catch (SerDeException e) {
           throw new HiveException(e);
         }
@@ -162,23 +161,12 @@
         row.add(keyObject);
         row.add(valueObject[tag]);
         row.add(tag);
-        if (valueObjectInspector[tag] == null) {
-          // Directly create ObjectInspector here because we didn't know the number of cols till now.
-          valueObjectInspector[tag] = MetadataListStructObjectInspector.getInstance(((ColumnSet)valueObject[tag]).col.size());
-          ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
-          ois.add(keyObjectInspector);
-          ois.add(valueObjectInspector[tag]);
-          ois.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(Byte.class));
-          rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
-              Arrays.asList(fieldNames), ois);
-        }
         reducer.process(row, rowObjectInspector[tag]);
       }
 
-
     } catch (HiveException e) {
       abort = true;
-      throw new IOException (e.getMessage());
+      throw new IOException (e);
     }
   }
 

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java Mon Nov 10 17:50:06 2008
@@ -72,19 +72,8 @@
       paramEvaluators[i].evaluate(row, rowInspector, paramInspectableObjects[i]);
       paramValues[i] = paramInspectableObjects[i].o;
     }
-    try {
-      result.o = udfMethod.invoke(udf, paramValues);
-      result.oi = outputObjectInspector;
-    } catch (Exception e) {
-      if (e instanceof HiveException) {
-        throw (HiveException)e;
-      } else if (e instanceof RuntimeException) {
-        throw (RuntimeException)e;
-      } else {
-        throw new HiveException("Unable to execute UDF function " + udf.getClass() + " " 
-          + udfMethod + " on inputs " + "(" + paramValues.length + ") " + Arrays.asList(paramValues) + ": " + e.getMessage(), e);
-      }
-    }
+    result.o = FunctionRegistry.invoke(udfMethod, udf, paramValues);
+    result.oi = outputObjectInspector;
   }
 
   public ObjectInspector evaluateInspector(ObjectInspector rowInspector)

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Mon Nov 10 17:50:06 2008
@@ -44,4 +44,6 @@
     eval.evaluate(row, rowInspector, result);
     forward(result.o, result.oi);
   }
+
+  
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java Mon Nov 10 17:50:06 2008
@@ -20,7 +20,12 @@
 
 import java.io.Serializable;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 import java.util.Properties;
@@ -29,8 +34,11 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.plan.fetchWork;
+import org.apache.hadoop.hive.ql.plan.partitionDesc;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -45,6 +53,8 @@
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 /**
  * FetchTask implementation
@@ -56,42 +66,27 @@
   
   public void initialize (HiveConf conf) {
    	super.initialize(conf);
-    splitNum = 0;
     currRecReader = null;
     
    	try {
        // Create a file system handle
        fs = FileSystem.get(conf);   
-       serde = work.getDeserializerClass().newInstance();
-       serde.initialize(null, work.getSchema());
        job = new JobConf(conf, ExecDriver.class);
-       Path inputP = work.getSrcDir();
-       if(!fs.exists(inputP)) {
-         empty = true;
-         return;
-       }
-
-       empty = true;
-       FileStatus[] fStats = fs.listStatus(inputP);
-       for (FileStatus fStat:fStats) {
-         if (fStat.getLen() > 0) {
-           empty = false;
-           break;
-         }
-       }
-
-       if (empty)
-         return;
-
-       FileInputFormat.setInputPaths(job, inputP);
-       inputFormat = getInputFormatFromCache(work.getInputFormatClass(), job);
-	     inputSplits = inputFormat.getSplits(job, 1);
+       
 	 	   mSerde = new MetadataTypedColumnsetSerDe();
        Properties mSerdeProp = new Properties();
        mSerdeProp.put(Constants.SERIALIZATION_FORMAT, "" + Utilities.tabCode);
        mSerdeProp.put(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
        mSerde.initialize(null, mSerdeProp);
+       
+       currPath = null;
+       currTbl = null;
+       currPart = null;
+       iterPath = null;
+       iterPartDesc = null;
        totalRows = 0;
+       tblDataDone = false;
+       rowWithPart = new Object[2];
     } catch (Exception e) {
       // Bail out ungracefully - we should never hit
       // this here - but would have hit it in SemanticAnalyzer
@@ -136,11 +131,116 @@
 	private Deserializer  serde;
 	private MetadataTypedColumnsetSerDe mSerde;
 	private int totalRows;
-  private boolean empty;
+  private Iterator<Path> iterPath;
+  private Iterator<partitionDesc> iterPartDesc; 
+  private Path currPath;
+  private partitionDesc currPart;
+  private tableDesc     currTbl;
+  private boolean       tblDataDone;
+  private StructObjectInspector rowObjectInspector;
+  private Object[] rowWithPart;
+
+  private void setPrtnDesc() throws Exception {
+    List<String> partNames = new ArrayList<String>();
+    List<String> partValues = new ArrayList<String>();
+    
+    String pcols = currPart.getTableDesc().getProperties().getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
+    LinkedHashMap<String, String> partSpec = currPart.getPartSpec();
+    
+    List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>();
+    String[] partKeys = pcols.trim().split("/");
+    for(String key: partKeys) {
+      partNames.add(key);
+      partValues.add(partSpec.get(key));
+      partObjectInspectors.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(String.class));
+    }
+    StructObjectInspector partObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(partNames, partObjectInspectors);
+    rowObjectInspector = (StructObjectInspector)serde.getObjectInspector();
+    
+    rowWithPart[1] = partValues;
+    rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(Arrays.asList(new StructObjectInspector[]{
+                                                                                              rowObjectInspector, partObjectInspector}));
+  }
+
+  private void getNextPath() throws Exception {
+    // first time
+    if (iterPath == null) {
+      if (work.getTblDir() != null) {
+        if (!tblDataDone) {
+          currPath = work.getTblDir();
+          currTbl = work.getTblDesc();
+          if (fs.exists(currPath)) 
+          {
+            FileStatus[] fStats = fs.listStatus(currPath);
+            for (FileStatus fStat:fStats) {
+              if (fStat.getLen() > 0) {
+                tblDataDone = true;
+                break;
+              }
+            }
+          }
+
+          if (!tblDataDone) currPath = null;
+          return;
+        } else {
+          currTbl = null;
+          currPath = null;
+        }
+        return;
+      }
+      else {
+        iterPath = work.getPartDir().iterator();
+        iterPartDesc = work.getPartDesc().iterator();
+      }
+    }
+
+		while (iterPath.hasNext()) {
+			Path nxt = iterPath.next();
+      partitionDesc prt = iterPartDesc.next();
+		  if (fs.exists(nxt)) 
+      {
+        FileStatus[] fStats = fs.listStatus(nxt);
+        for (FileStatus fStat:fStats) {
+          if (fStat.getLen() > 0) {
+            currPath = nxt;
+            currPart = prt;
+            return;
+          }
+        }
+      }
+		}
+	}
   
  	private RecordReader<WritableComparable, Writable> getRecordReader() throws Exception {
-		if (splitNum >= inputSplits.length) 
-  	  return null;
+ 		if (currPath == null) {
+ 			getNextPath();
+ 			if (currPath == null)
+ 				return null;
+
+ 			FileInputFormat.setInputPaths(job, currPath);
+      tableDesc tmp = currTbl;
+      if (tmp == null)
+        tmp = currPart.getTableDesc();
+ 			inputFormat = getInputFormatFromCache(tmp.getInputFileFormatClass(), job);
+ 			inputSplits = inputFormat.getSplits(job, 1); 		
+ 			splitNum = 0;
+      serde = tmp.getDeserializerClass().newInstance();
+      serde.initialize(null, tmp.getProperties());
+      LOG.debug("Creating fetchTask with deserializer typeinfo: " + serde.getObjectInspector().getTypeName());
+      LOG.debug("deserializer properties: " + tmp.getProperties());
+      if (!tblDataDone)
+        setPrtnDesc();
+ 		}
+ 		
+ 		if (splitNum >= inputSplits.length) {
+ 			if (currRecReader != null) {
+ 				currRecReader.close();
+        currRecReader = null;
+      }
+ 			currPath = null;
+ 			return getRecordReader();
+ 		}
+ 		
 		currRecReader = inputFormat.getRecordReader(inputSplits[splitNum++], job, Reporter.NULL);
 		key = currRecReader.createKey();
 		value = currRecReader.createValue();
@@ -149,16 +249,15 @@
  	
   public boolean fetch(Vector<String> res) {
   	try {
-      if (empty)
-        return false;
-
       int numRows = 0;
       int rowsRet = MAX_ROWS;
       if ((work.getLimit() >= 0) && ((work.getLimit() - totalRows) < rowsRet))
         rowsRet = work.getLimit() - totalRows;
       if (rowsRet <= 0) {
-        if (currRecReader != null)
+        if (currRecReader != null) {
           currRecReader.close();
+          currRecReader = null;
+        }
         return false;
       }
 
@@ -174,12 +273,18 @@
   	    }
       	boolean ret = currRecReader.next(key, value);
    	  	if (ret) {
-   	  		Object obj = serde.deserialize(value);
-   	  		res.add(((Text)mSerde.serialize(obj, serde.getObjectInspector())).toString());
+          if (tblDataDone) {
+            Object obj = serde.deserialize(value);
+            res.add(((Text)mSerde.serialize(obj, serde.getObjectInspector())).toString());
+          } else {
+            rowWithPart[0] = serde.deserialize(value);
+            res.add(((Text)mSerde.serialize(rowWithPart, rowObjectInspector)).toString());
+          }
    	  		numRows++;
    	  	}
    	  	else {
           currRecReader.close();
+          currRecReader = null;
    	  		currRecReader = getRecordReader();
    	  		if (currRecReader == null) {
             if (numRows == 0) 

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Mon Nov 10 17:50:06 2008
@@ -19,13 +19,18 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.*;
+import java.util.HashMap;
+import java.util.List;
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.filterDesc;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
 
 /**
  * Filter operator implementation
@@ -73,4 +78,15 @@
           conditionInspectableObject.o.getClass().getName());
     }
   }
+  
+  public List<String> mergeColListsFromChildren(List<String> colList,
+                                        HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
+    exprNodeDesc condn = conf.getPredicate();
+
+    // get list of columns used in the filter
+    List<String> cl = condn.getCols();
+
+    return Utilities.mergeUniqElems(colList, cl);
+  }
+
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Mon Nov 10 17:50:06 2008
@@ -22,6 +22,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -29,6 +30,8 @@
 import java.lang.Void;
 
 import org.apache.hadoop.hive.ql.exec.FunctionInfo.OperatorType;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.groupByDesc;
 import org.apache.hadoop.hive.ql.udf.*;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 
@@ -247,7 +250,7 @@
 
   /**
    * This method is shared between UDFRegistry and UDAFRegistry.
-   * methodName will be "evaluate" for UDFRegistry, and "aggregate" for UDAFRegistry. 
+   * methodName will be "evaluate" for UDFRegistry, and "aggregate"/"evaluate"/"evaluatePartial" for UDAFRegistry. 
    */
   public static <T> Method getMethodInternal(Class<? extends T> udfClass, String methodName, boolean exact, List<Class<?>> argumentClasses) {
     int leastImplicitConversions = Integer.MAX_VALUE;
@@ -319,6 +322,9 @@
     return result;
   }
 
+  /**
+   * Returns the "aggregate" method of the UDAF.
+   */
   public static Method getUDAFMethod(String name, List<Class<?>> argumentClasses) {
     Class<? extends UDAF> udaf = getUDAF(name);
     if (udaf == null)
@@ -327,7 +333,62 @@
                                          argumentClasses);
   }
 
+  /**
+   * Returns the evaluate method for the UDAF based on the aggregation mode.
+   * See groupByDesc.Mode for details.
+   * 
+   * @param name  name of the UDAF
+   * @param mode  the mode of the aggregation
+   * @return      null if no such UDAF is found
+   */
+  public static Method getUDAFEvaluateMethod(String name, groupByDesc.Mode mode) {
+    Class<? extends UDAF> udaf = getUDAF(name);
+    if (udaf == null)
+      return null;
+    return FunctionRegistry.getMethodInternal(udaf, 
+        (mode == groupByDesc.Mode.COMPLETE || mode == groupByDesc.Mode.FINAL) 
+        ? "evaluate" : "evaluatePartial", true,
+        new ArrayList<Class<?>>() );
+  }
+
+  /**
+   * Returns the "aggregate" method of the UDAF.
+   */
   public static Method getUDAFMethod(String name, Class<?>... argumentClasses) {
     return getUDAFMethod(name, Arrays.asList(argumentClasses));
   }
+  
+  public static Object invoke(Method m, Object thisObject, Object[] arguments) throws HiveException {
+    Object o;
+    try {
+      o = m.invoke(thisObject, arguments);
+    } catch (Exception e) {
+      String thisObjectString = "" + thisObject + " of class " + 
+        (thisObject == null? "null" : thisObject.getClass().getName());
+
+      StringBuilder argumentString = new StringBuilder();
+      if (arguments == null) {
+        argumentString.append("null");
+      } else {
+        argumentString.append("{");
+        for (int i=0; i<arguments.length; i++) {
+          if (i>0) {
+            argumentString.append(", ");
+          }
+          if (arguments[i] == null) {
+            argumentString.append("null");
+          } else {
+            argumentString.append("" + arguments[i] + ":" + arguments[i].getClass().getName());
+          }
+        }
+        argumentString.append("} of size " + arguments.length);
+      }
+      
+      throw new HiveException("Unable to execute method " + m + " " 
+          + " on object " + thisObjectString
+          + " with arguments " + argumentString.toString() 
+          + ":" + e.getMessage());
+    }
+    return o;
+  }
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Mon Nov 10 17:50:06 2008
@@ -18,9 +18,12 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Iterator;
+import java.util.Map;
 import java.io.Serializable;
 import java.lang.reflect.Method;
 
@@ -32,6 +35,8 @@
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
 
 /**
  * GroupBy operator implementation.
@@ -61,98 +66,112 @@
   transient protected HashMap<ArrayList<Object>, UDAF[]> hashAggregations;
   
   transient boolean firstRow;
-  
+  transient long    totalMemory;
+  transient boolean hashAggr;
+
   public void initialize(Configuration hconf) throws HiveException {
     super.initialize(hconf);
-    try {
-      // init keyFields
-      keyFields = new ExprNodeEvaluator[conf.getKeys().size()];
-      for (int i = 0; i < keyFields.length; i++) {
-        keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i));
-      }
-    
-      // init aggregationParameterFields
-      aggregationParameterFields = new ExprNodeEvaluator[conf.getAggregators().size()][];
-      for (int i = 0; i < aggregationParameterFields.length; i++) {
-        ArrayList<exprNodeDesc> parameters = conf.getAggregators().get(i).getParameters();
-        aggregationParameterFields[i] = new ExprNodeEvaluator[parameters.size()];
-        for (int j = 0; j < parameters.size(); j++) {
-          aggregationParameterFields[i][j] = ExprNodeEvaluatorFactory.get(parameters.get(j));
-        }
-      }
-      // init aggregationIsDistinct
-      aggregationIsDistinct = new boolean[conf.getAggregators().size()];
-      for(int i=0; i<aggregationIsDistinct.length; i++) {
-        aggregationIsDistinct[i] = conf.getAggregators().get(i).getDistinct();
-      }
+    totalMemory = Runtime.getRuntime().totalMemory();
 
-      // init aggregationClasses  
-      aggregationClasses = (Class<? extends UDAF>[]) new Class[conf.getAggregators().size()];
-      for (int i = 0; i < conf.getAggregators().size(); i++) {
-        aggregationDesc agg = conf.getAggregators().get(i);
-        aggregationClasses[i] = agg.getAggregationClass();
+    // init keyFields
+    keyFields = new ExprNodeEvaluator[conf.getKeys().size()];
+    for (int i = 0; i < keyFields.length; i++) {
+      keyFields[i] = ExprNodeEvaluatorFactory.get(conf.getKeys().get(i));
+    }
+  
+    // init aggregationParameterFields
+    aggregationParameterFields = new ExprNodeEvaluator[conf.getAggregators().size()][];
+    for (int i = 0; i < aggregationParameterFields.length; i++) {
+      ArrayList<exprNodeDesc> parameters = conf.getAggregators().get(i).getParameters();
+      aggregationParameterFields[i] = new ExprNodeEvaluator[parameters.size()];
+      for (int j = 0; j < parameters.size(); j++) {
+        aggregationParameterFields[i][j] = ExprNodeEvaluatorFactory.get(parameters.get(j));
       }
+    }
+    // init aggregationIsDistinct
+    aggregationIsDistinct = new boolean[conf.getAggregators().size()];
+    for(int i=0; i<aggregationIsDistinct.length; i++) {
+      aggregationIsDistinct[i] = conf.getAggregators().get(i).getDistinct();
+    }
+
+    // init aggregationClasses  
+    aggregationClasses = (Class<? extends UDAF>[]) new Class[conf.getAggregators().size()];
+    for (int i = 0; i < conf.getAggregators().size(); i++) {
+      aggregationDesc agg = conf.getAggregators().get(i);
+      aggregationClasses[i] = agg.getAggregationClass();
+    }
 
-      // init aggregations, aggregationsAggregateMethods,
+    // init aggregations, aggregationsAggregateMethods,
+    // aggregationsEvaluateMethods
+    aggregationsAggregateMethods = new Method[aggregationClasses.length];
+    aggregationsEvaluateMethods = new Method[aggregationClasses.length];
+    String evaluateMethodName = ((conf.getMode() == groupByDesc.Mode.PARTIAL1 || conf.getMode() == groupByDesc.Mode.HASH ||
+                                  conf.getMode() == groupByDesc.Mode.PARTIAL2)
+                                 ? "evaluatePartial" : "evaluate");
+
+    for(int i=0; i<aggregationClasses.length; i++) {
+      String aggregateMethodName = (((conf.getMode() == groupByDesc.Mode.PARTIAL1) || (conf.getMode() == groupByDesc.Mode.HASH)) ? "aggregate" : "aggregatePartial");
+
+      if (aggregationIsDistinct[i] && (conf.getMode() != groupByDesc.Mode.FINAL))
+        aggregateMethodName = "aggregate";
+      // aggregationsAggregateMethods
+      for( Method m : aggregationClasses[i].getMethods() ){
+        if( m.getName().equals( aggregateMethodName ) 
+            && m.getParameterTypes().length == aggregationParameterFields[i].length) {              
+          aggregationsAggregateMethods[i] = m;
+          break;
+        }
+      }
+      if (null == aggregationsAggregateMethods[i]) {
+        throw new HiveException("Cannot find " + aggregateMethodName + " method of UDAF class "
+                                 + aggregationClasses[i].getName() + " that accepts "
+                                 + aggregationParameterFields[i].length + " parameters!");
+      }
       // aggregationsEvaluateMethods
-      aggregationsAggregateMethods = new Method[aggregationClasses.length];
-      aggregationsEvaluateMethods = new Method[aggregationClasses.length];
-      String aggregateMethodName = (conf.getMode() == groupByDesc.Mode.PARTIAL2 
-         ? "aggregatePartial" : "aggregate");
-      String evaluateMethodName = ((conf.getMode() == groupByDesc.Mode.PARTIAL1 || conf.getMode() == groupByDesc.Mode.HASH)
-         ? "evaluatePartial" : "evaluate");
-      for(int i=0; i<aggregationClasses.length; i++) {
-        // aggregationsAggregateMethods
-        for( Method m : aggregationClasses[i].getMethods() ){
-          if( m.getName().equals( aggregateMethodName ) 
-              && m.getParameterTypes().length == aggregationParameterFields[i].length) {              
-            aggregationsAggregateMethods[i] = m;
-            break;
-          }
-        }
-        if (null == aggregationsAggregateMethods[i]) {
-          throw new RuntimeException("Cannot find " + aggregateMethodName + " method of UDAF class "
-                                   + aggregationClasses[i].getName() + " that accepts "
-                                   + aggregationParameterFields[i].length + " parameters!");
-        }
-        // aggregationsEvaluateMethods
+      try {
         aggregationsEvaluateMethods[i] = aggregationClasses[i].getMethod(evaluateMethodName);
-
-        if (null == aggregationsEvaluateMethods[i]) {
-          throw new RuntimeException("Cannot find " + evaluateMethodName + " method of UDAF class "
-                                   + aggregationClasses[i].getName() + "!");
-        }
-        assert(aggregationsEvaluateMethods[i] != null);
+      } catch (Exception e) {
+        throw new HiveException("Unable to get the method named " + evaluateMethodName + " from " 
+            + aggregationClasses[i] + ": " + e.getMessage());
       }
 
-      if (conf.getMode() != groupByDesc.Mode.HASH) {
-        aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
-        aggregations = newAggregations();
-      } else {
-        hashAggregations = new HashMap<ArrayList<Object>, UDAF[]>();
-      }
-      // init objectInspectors
-      int totalFields = keyFields.length + aggregationClasses.length;
-      objectInspectors = new ArrayList<ObjectInspector>(totalFields);
-      for(int i=0; i<keyFields.length; i++) {
-        objectInspectors.add(null);
-      }
-      for(int i=0; i<aggregationClasses.length; i++) {
-        objectInspectors.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(
-            aggregationsEvaluateMethods[i].getReturnType()));
+      if (null == aggregationsEvaluateMethods[i]) {
+        throw new HiveException("Cannot find " + evaluateMethodName + " method of UDAF class "
+                                 + aggregationClasses[i].getName() + "!");
       }
-      
-      firstRow = true;
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new RuntimeException(e);
+      assert(aggregationsEvaluateMethods[i] != null);
+    }
+
+    aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
+    if (conf.getMode() != groupByDesc.Mode.HASH) {
+      aggregations = newAggregations();
+      hashAggr = false;
+    } else {
+      hashAggregations = new HashMap<ArrayList<Object>, UDAF[]>();
+      hashAggr = true;
     }
+    // init objectInspectors
+    int totalFields = keyFields.length + aggregationClasses.length;
+    objectInspectors = new ArrayList<ObjectInspector>(totalFields);
+    for(int i=0; i<keyFields.length; i++) {
+      objectInspectors.add(null);
+    }
+    for(int i=0; i<aggregationClasses.length; i++) {
+      objectInspectors.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(
+          aggregationsEvaluateMethods[i].getReturnType()));
+    }
+    
+    firstRow = true;
   }
 
-  protected UDAF[] newAggregations() throws Exception {      
+  protected UDAF[] newAggregations() throws HiveException {      
     UDAF[] aggs = new UDAF[aggregationClasses.length];
     for(int i=0; i<aggregationClasses.length; i++) {
-      aggs[i] = aggregationClasses[i].newInstance();
+      try {
+        aggs[i] = aggregationClasses[i].newInstance();
+      } catch (Exception e) {
+        throw new HiveException("Unable to create an instance of class " + aggregationClasses[i] + ": " + e.getMessage());
+      }
       aggs[i].init();
     }
     return aggs;
@@ -160,7 +179,8 @@
 
   InspectableObject tempInspectableObject = new InspectableObject();
   
-  protected void updateAggregations(UDAF[] aggs, Object row, ObjectInspector rowInspector, Object[][] lastInvoke) throws Exception {
+  protected void updateAggregations(UDAF[] aggs, Object row, ObjectInspector rowInspector, boolean hashAggr, boolean newEntry,
+                                    Object[][] lastInvoke) throws HiveException {
     for(int ai=0; ai<aggs.length; ai++) {
       // Calculate the parameters 
       Object[] o = new Object[aggregationParameterFields[ai].length];
@@ -168,24 +188,35 @@
         aggregationParameterFields[ai][pi].evaluate(row, rowInspector, tempInspectableObject);
         o[pi] = tempInspectableObject.o; 
       }
+
       // Update the aggregations.
-      if (aggregationIsDistinct[ai] && lastInvoke != null) {
-        // different differentParameters?
-        boolean differentParameters = (lastInvoke[ai] == null);
-        if (!differentParameters) {
-          for(int pi=0; pi<o.length; pi++) {
-            if (!o[pi].equals(lastInvoke[ai][pi])) {
-              differentParameters = true;
-              break;
-            }
+      if (aggregationIsDistinct[ai]) {
+        if (hashAggr) {
+          if (newEntry) {
+            FunctionRegistry.invoke(aggregationsAggregateMethods[ai], aggs[ai], o);
           }
-        }  
-        if (differentParameters) {
-          aggregationsAggregateMethods[ai].invoke(aggs[ai], o);
-          lastInvoke[ai] = o;
         }
-      } else {
-        aggregationsAggregateMethods[ai].invoke(aggs[ai], o);
+        else {
+          boolean differentParameters = false;
+          if ((lastInvoke == null) || (lastInvoke[ai] == null))
+            differentParameters = true;
+          else {
+            for(int pi=0; pi<o.length; pi++) {
+              if (!o[pi].equals(lastInvoke[ai][pi])) {
+                differentParameters = true;
+                break;
+              }
+            }  
+          }
+
+          if (differentParameters) {
+            FunctionRegistry.invoke(aggregationsAggregateMethods[ai], aggs[ai], o);
+            lastInvoke[ai] = o;
+          }
+        }
+      }
+      else {
+        FunctionRegistry.invoke(aggregationsAggregateMethods[ai], aggs[ai], o);
       }
     }
   }
@@ -208,53 +239,98 @@
         for(int i=0; i<objectInspectors.size(); i++) {
           fieldNames.add(Integer.valueOf(i).toString());
         }
-        outputObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
-          fieldNames, objectInspectors);
-      }
-      // Prepare aggs for updating
-      UDAF[] aggs = null;
-      Object[][] lastInvoke = null;
-      if (aggregations != null) {
-        // sort-based aggregation
-        // Need to forward?
-        boolean keysAreEqual = newKeys.equals(currentKeys);
-        if (currentKeys != null && !keysAreEqual) {
-          forward(currentKeys, aggregations);
-        }
-        // Need to update the keys?
-        if (currentKeys == null || !keysAreEqual) {
-          currentKeys = newKeys;
-          // init aggregations
-          for(UDAF aggregation: aggregations) {
-            aggregation.init();
-          }
-          // clear parameters in last-invoke
-          for(int i=0; i<aggregationsParametersLastInvoke.length; i++) {
-            aggregationsParametersLastInvoke[i] = null;
-          }
-        }
-        aggs = aggregations;
-        lastInvoke = aggregationsParametersLastInvoke;
-      } else {
-        // hash-based aggregations
-        aggs = hashAggregations.get(newKeys);
-        if (aggs == null) {
-          aggs = newAggregations();
-          hashAggregations.put(newKeys, aggs);
-          // TODO: Hash aggregation does not support DISTINCT now
-          lastInvoke = null;
-        }
+        outputObjectInspector = 
+          ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, objectInspectors);
       }
 
-      // Update the aggs
-      updateAggregations(aggs, row, rowInspector, lastInvoke);
-
+      if (hashAggr)
+        processHashAggr(row, rowInspector, newKeys);
+      else
+        processAggr(row, rowInspector, newKeys);
+    } catch (HiveException e) {
+      throw e;
     } catch (Exception e) {
-      e.printStackTrace();
       throw new HiveException(e);
     }
   }
-  
+
+  private void processHashAggr(Object row, ObjectInspector rowInspector, ArrayList<Object> newKeys) throws HiveException {
+    // Prepare aggs for updating
+    UDAF[] aggs = null;
+    boolean newEntry = false;
+
+    // hash-based aggregations
+    aggs = hashAggregations.get(newKeys);
+    if (aggs == null) {
+      aggs = newAggregations();
+      hashAggregations.put(newKeys, aggs);
+      newEntry = true;
+    }
+    
+    // Update the aggs
+    updateAggregations(aggs, row, rowInspector, true, newEntry, null);
+    
+    // currently, we use a simple approximation - if 90% of memory is being
+    // used, flush 
+    long freeMemory = Runtime.getRuntime().freeMemory();
+    if (shouldBeFlushed(totalMemory, freeMemory)) {
+      flush();
+    }
+  }
+
+  private void processAggr(Object row, ObjectInspector rowInspector, ArrayList<Object> newKeys) throws HiveException {
+    // Prepare aggs for updating
+    UDAF[] aggs = null;
+    Object[][] lastInvoke = null;
+    boolean keysAreEqual = newKeys.equals(currentKeys);
+    
+    // forward the current keys if needed for sort-based aggregation
+    if (currentKeys != null && !keysAreEqual)
+      forward(currentKeys, aggregations);
+    
+    // Need to update the keys?
+    if (currentKeys == null || !keysAreEqual) {
+      currentKeys = newKeys;
+      
+      // init aggregations
+      for(UDAF aggregation: aggregations)
+        aggregation.init();
+      
+      // clear parameters in last-invoke
+      for(int i=0; i<aggregationsParametersLastInvoke.length; i++)
+        aggregationsParametersLastInvoke[i] = null;
+    }
+    
+    aggs = aggregations;
+    
+    lastInvoke = aggregationsParametersLastInvoke;
+    // Update the aggs
+    updateAggregations(aggs, row, rowInspector, false, false, lastInvoke);
+  }
+
+  private boolean shouldBeFlushed(long total, long free) {
+    if (10 * free >= total)
+      return true;
+    return false;
+  }
+
+  private void flush() throws HiveException {
+    // Currently, the algorithm flushes 10% of the entries - this can be
+    // changed in the future
+
+    int oldSize = hashAggregations.size();
+    Iterator iter = hashAggregations.entrySet().iterator();
+    int numDel = 0;
+    while (iter.hasNext()) {
+      Map.Entry<ArrayList<Object>, UDAF[]> m = (Map.Entry)iter.next();
+      forward(m.getKey(), m.getValue());
+      iter.remove();
+      numDel++;
+      if (numDel * 10 >= oldSize)
+        return;
+    }
+  }
+
   /**
    * Forward a record of keys and aggregation results.
    * 
@@ -262,14 +338,19 @@
    *          The keys in the record
    * @throws HiveException
    */
-  protected void forward(ArrayList<Object> keys, UDAF[] aggs) throws Exception {
+  protected void forward(ArrayList<Object> keys, UDAF[] aggs) throws HiveException {
     int totalFields = keys.size() + aggs.length;
     List<Object> a = new ArrayList<Object>(totalFields);
     for(int i=0; i<keys.size(); i++) {
       a.add(keys.get(i));
     }
     for(int i=0; i<aggs.length; i++) {
-      a.add(aggregationsEvaluateMethods[i].invoke(aggs[i]));
+      try {
+        a.add(aggregationsEvaluateMethods[i].invoke(aggs[i]));
+      } catch (Exception e) {
+        throw new HiveException("Unable to execute UDAF function " + aggregationsEvaluateMethods[i] + " " 
+            + " on object " + "(" + aggs[i] + ") " + ": " + e.getMessage());
+      }
     }
     forward(a, outputObjectInspector);
   }
@@ -304,4 +385,20 @@
     super.close(abort);
   }
 
+  // Group by contains the columns needed - no need to aggregate from children
+  public List<String> genColLists(HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
+    List<String> colLists = new ArrayList<String>();
+    ArrayList<exprNodeDesc> keys = conf.getKeys();
+    for (exprNodeDesc key : keys)
+      colLists = Utilities.mergeUniqElems(colLists, key.getCols());
+    
+    ArrayList<aggregationDesc> aggrs = conf.getAggregators();
+    for (aggregationDesc aggr : aggrs) { 
+      ArrayList<exprNodeDesc> params = aggr.getParameters();
+      for (exprNodeDesc param : params) 
+        colLists = Utilities.mergeUniqElems(colLists, param.getCols());
+    }
+
+    return colLists;
+  }
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Mon Nov 10 17:50:06 2008
@@ -26,6 +26,9 @@
 import java.util.Stack;
 import java.util.Vector;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
@@ -42,7 +45,9 @@
  */
 public class JoinOperator extends Operator<joinDesc> implements Serializable {
 
-  // a list of value expressions for each alias are maintained 
+  static final private Log LOG = LogFactory.getLog(JoinOperator.class.getName());
+  
+  // a list of value expressions for each alias are maintained
   public static class JoinExprMap {
     ExprNodeEvaluator[] valueFields;
 
@@ -56,62 +61,79 @@
 
   }
 
-  public static class IntermediateObject{
+  public static class IntermediateObject {
     ArrayList<Object>[] objs;
     int curSize;
 
     public IntermediateObject(ArrayList<Object>[] objs, int curSize) {
-      this.objs  = objs;
+      this.objs = objs;
       this.curSize = curSize;
     }
 
-    public ArrayList<Object>[] getObjs() { return objs; }
-    public int getCurSize() { return curSize; }
-    public void pushObj(ArrayList<Object> obj) { objs[curSize++] = obj; }
-    public void popObj() { curSize--; }
+    public ArrayList<Object>[] getObjs() {
+      return objs;
+    }
+
+    public int getCurSize() {
+      return curSize;
+    }
+
+    public void pushObj(ArrayList<Object> obj) {
+      objs[curSize++] = obj;
+    }
+
+    public void popObj() {
+      curSize--;
+    }
   }
 
   transient protected int numValues; // number of aliases
   transient static protected ExprNodeEvaluator aliasField;
+  transient static protected ExprNodeEvaluator keyField;
   transient protected HashMap<Byte, JoinExprMap> joinExprs;
-  transient static protected Byte[] order; // order in which the results should be outputted
+  transient static protected Byte[] order; // order in which the results should
+                                           // be outputted
   transient protected joinCond[] condn;
   transient protected boolean noOuterJoin;
-  transient private Object[] dummyObj; // for outer joins, contains the potential nulls for the concerned aliases
+  transient private Object[] dummyObj; // for outer joins, contains the
+                                       // potential nulls for the concerned
+                                       // aliases
   transient private Vector<ArrayList<Object>>[] dummyObjVectors;
   transient private Stack<Iterator<ArrayList<Object>>> iterators;
   transient private int totalSz; // total size of the composite object
   transient ObjectInspector joinOutputObjectInspector;
-  
-  static
-  {
-    aliasField = ExprNodeEvaluatorFactory.get(new exprNodeColumnDesc(String.class, Utilities.ReduceField.ALIAS.toString()));
+
+  static {
+    aliasField = ExprNodeEvaluatorFactory.get(new exprNodeColumnDesc(
+        String.class, Utilities.ReduceField.ALIAS.toString()));
+    keyField = ExprNodeEvaluatorFactory.get(new exprNodeColumnDesc(
+        String.class, Utilities.ReduceField.KEY.toString()));
   }
-  
-  HashMap<Byte, Vector<ArrayList<Object>>> storage;
 
+  HashMap<Byte, Vector<ArrayList<Object>>> storage;
+  int joinEmitInterval = -1;
+  
   public void initialize(Configuration hconf) throws HiveException {
     super.initialize(hconf);
     totalSz = 0;
     // Map that contains the rows for each alias
     storage = new HashMap<Byte, Vector<ArrayList<Object>>>();
-    
+
     numValues = conf.getExprs().size();
     joinExprs = new HashMap<Byte, JoinExprMap>();
-    if (order == null)
-    {
+    if (order == null) {
       order = new Byte[numValues];
       for (int i = 0; i < numValues; i++)
-        order[i] = (byte)i;
+        order[i] = (byte) i;
     }
     condn = conf.getConds();
     noOuterJoin = conf.getNoOuterJoin();
     Map<Byte, ArrayList<exprNodeDesc>> map = conf.getExprs();
     Iterator entryIter = map.entrySet().iterator();
     while (entryIter.hasNext()) {
-      Map.Entry e = (Map.Entry)entryIter.next();
-      Byte key = (Byte)e.getKey();
-      ArrayList<exprNodeDesc> expr = (ArrayList<exprNodeDesc>)e.getValue();
+      Map.Entry e = (Map.Entry) entryIter.next();
+      Byte key = (Byte) e.getKey();
+      ArrayList<exprNodeDesc> expr = (ArrayList<exprNodeDesc>) e.getValue();
       int sz = expr.size();
       totalSz += sz;
 
@@ -123,12 +145,15 @@
       joinExprs.put(key, new JoinExprMap(valueFields));
     }
 
-    ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(totalSz);
-    for(int i=0; i<totalSz; i++) {
-      structFieldObjectInspectors.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(String.class));
-    }
-    joinOutputObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
-        ObjectInspectorUtils.getIntegerArray(totalSz), structFieldObjectInspectors);
+    ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(
+        totalSz);
+    for (int i = 0; i < totalSz; i++) {
+      structFieldObjectInspectors.add(ObjectInspectorFactory
+          .getStandardPrimitiveObjectInspector(String.class));
+    }
+    joinOutputObjectInspector = ObjectInspectorFactory
+        .getStandardStructObjectInspector(ObjectInspectorUtils
+            .getIntegerArray(totalSz), structFieldObjectInspectors);
 
     dummyObj = new Object[numValues];
     dummyObjVectors = new Vector[numValues];
@@ -149,6 +174,8 @@
     }
 
     iterators = new Stack<Iterator<ArrayList<Object>>>();
+    
+    joinEmitInterval = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME);
   }
 
   public void startGroup() throws HiveException {
@@ -159,7 +186,9 @@
   }
 
   InspectableObject tempAliasInspectableObject = new InspectableObject();
-  public void process(Object row, ObjectInspector rowInspector) throws HiveException {
+
+  public void process(Object row, ObjectInspector rowInspector)
+      throws HiveException {
     try {
       // get alias
       aliasField.evaluate(row, rowInspector, tempAliasInspectableObject);
@@ -176,15 +205,40 @@
         nr.add(tempAliasInspectableObject.o);
       }
 
+      // Are we consuming too much memory
+      if (storage.get(alias).size() == joinEmitInterval) {
+        if (alias == numValues - 1) {
+          // The input is sorted by alias, so if we are already in the last join
+          // operand,
+          // we can emit some results now.
+          // Note this has to be done before adding the current row to the
+          // storage,
+          // to preserve the correctness for outer joins.
+          checkAndGenObject();
+          storage.get(alias).clear();
+        } else {
+          // Output a warning if we reached at least 1000 rows for a join
+          // operand
+          // We won't output a warning for the last join operand since the size
+          // will never goes to joinEmitInterval.
+          InspectableObject io = new InspectableObject();
+          keyField.evaluate(row, rowInspector, io);
+          LOG.warn("table " + alias
+              + " has more than joinEmitInterval rows for join key " + io.o);
+        }
+      }
+
       // Add the value to the vector
       storage.get(alias).add(nr);
+
     } catch (Exception e) {
       e.printStackTrace();
       throw new HiveException(e);
     }
   }
 
-  private void createForwardJoinObject(IntermediateObject intObj, boolean[] nullsArr) throws HiveException {
+  private void createForwardJoinObject(IntermediateObject intObj,
+      boolean[] nullsArr) throws HiveException {
     ArrayList<Object> nr = new ArrayList<Object>(totalSz);
     for (int i = 0; i < numValues; i++) {
       Byte alias = order[i];
@@ -204,15 +258,17 @@
   }
 
   private void copyOldArray(boolean[] src, boolean[] dest) {
-    for (int i = 0; i < src.length; i++) dest[i] = src[i];
+    for (int i = 0; i < src.length; i++)
+      dest[i] = src[i];
   }
 
-  private Vector<boolean[]> joinObjectsInnerJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
-  {
-    if (newObjNull) return resNulls;
+  private Vector<boolean[]> joinObjectsInnerJoin(Vector<boolean[]> resNulls,
+      Vector<boolean[]> inputNulls, ArrayList<Object> newObj,
+      IntermediateObject intObj, int left, boolean newObjNull) {
+    if (newObjNull)
+      return resNulls;
     Iterator<boolean[]> nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext())
-    {
+    while (nullsIter.hasNext()) {
       boolean[] oldNulls = nullsIter.next();
       boolean oldObjNull = oldNulls[left];
       if (!oldObjNull) {
@@ -224,12 +280,13 @@
     }
     return resNulls;
   }
-  
-  private Vector<boolean[]> joinObjectsLeftOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
-  {
+
+  private Vector<boolean[]> joinObjectsLeftOuterJoin(
+      Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls,
+      ArrayList<Object> newObj, IntermediateObject intObj, int left,
+      boolean newObjNull) {
     Iterator<boolean[]> nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext())
-    {
+    while (nullsIter.hasNext()) {
       boolean[] oldNulls = nullsIter.next();
       boolean oldObjNull = oldNulls[left];
       boolean[] newNulls = new boolean[intObj.getCurSize()];
@@ -243,25 +300,25 @@
     return resNulls;
   }
 
-  private Vector<boolean[]> joinObjectsRightOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
-  {
-    if (newObjNull) return resNulls;
+  private Vector<boolean[]> joinObjectsRightOuterJoin(
+      Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls,
+      ArrayList<Object> newObj, IntermediateObject intObj, int left,
+      boolean newObjNull) {
+    if (newObjNull)
+      return resNulls;
     boolean allOldObjsNull = true;
 
     Iterator<boolean[]> nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext())
-    {
+    while (nullsIter.hasNext()) {
       boolean[] oldNulls = nullsIter.next();
-      if (!oldNulls[left])
-      {
+      if (!oldNulls[left]) {
         allOldObjsNull = false;
         break;
       }
     }
 
     nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext())
-    {
+    while (nullsIter.hasNext()) {
       boolean[] oldNulls = nullsIter.next();
       boolean oldObjNull = oldNulls[left];
 
@@ -270,8 +327,7 @@
         copyOldArray(oldNulls, newNulls);
         newNulls[oldNulls.length] = newObjNull;
         resNulls.add(newNulls);
-      }
-      else if (allOldObjsNull) {
+      } else if (allOldObjsNull) {
         boolean[] newNulls = new boolean[intObj.getCurSize()];
         for (int i = 0; i < intObj.getCurSize() - 1; i++)
           newNulls[i] = true;
@@ -282,12 +338,13 @@
     return resNulls;
   }
 
-  private Vector<boolean[]> joinObjectsFullOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
-  {
+  private Vector<boolean[]> joinObjectsFullOuterJoin(
+      Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls,
+      ArrayList<Object> newObj, IntermediateObject intObj, int left,
+      boolean newObjNull) {
     if (newObjNull) {
       Iterator<boolean[]> nullsIter = inputNulls.iterator();
-      while (nullsIter.hasNext())
-      {
+      while (nullsIter.hasNext()) {
         boolean[] oldNulls = nullsIter.next();
         boolean[] newNulls = new boolean[intObj.getCurSize()];
         copyOldArray(oldNulls, newNulls);
@@ -296,15 +353,13 @@
       }
       return resNulls;
     }
-    
+
     boolean allOldObjsNull = true;
 
     Iterator<boolean[]> nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext())
-    {
+    while (nullsIter.hasNext()) {
       boolean[] oldNulls = nullsIter.next();
-      if (!oldNulls[left])
-      {
+      if (!oldNulls[left]) {
         allOldObjsNull = false;
         break;
       }
@@ -312,24 +367,21 @@
     boolean rhsPreserved = false;
 
     nullsIter = inputNulls.iterator();
-    while (nullsIter.hasNext())
-    {
+    while (nullsIter.hasNext()) {
       boolean[] oldNulls = nullsIter.next();
       boolean oldObjNull = oldNulls[left];
 
-      if (!oldObjNull)   
-      {
+      if (!oldObjNull) {
         boolean[] newNulls = new boolean[intObj.getCurSize()];
         copyOldArray(oldNulls, newNulls);
         newNulls[oldNulls.length] = newObjNull;
         resNulls.add(newNulls);
-      }
-      else if (oldObjNull) {
+      } else if (oldObjNull) {
         boolean[] newNulls = new boolean[intObj.getCurSize()];
         copyOldArray(oldNulls, newNulls);
         newNulls[oldNulls.length] = true;
         resNulls.add(newNulls);
-         
+
         if (allOldObjsNull && !rhsPreserved) {
           newNulls = new boolean[intObj.getCurSize()];
           for (int i = 0; i < oldNulls.length; i++)
@@ -344,35 +396,35 @@
   }
 
   /*
-   * The new input is added to the list of existing inputs. Each entry in the 
-   * array of inputNulls denotes the entries in the intermediate object to
-   * be used. The intermediate object is augmented with the new object, and 
-   * list of nulls is changed appropriately. The list will contain all non-nulls
-   * for a inner join. The outer joins are processed appropriately.
+   * The new input is added to the list of existing inputs. Each entry in the
+   * array of inputNulls denotes the entries in the intermediate object to be
+   * used. The intermediate object is augmented with the new object, and list of
+   * nulls is changed appropriately. The list will contain all non-nulls for a
+   * inner join. The outer joins are processed appropriately.
    */
-  private Vector<boolean[]> joinObjects(Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int joinPos)
-  {
+  private Vector<boolean[]> joinObjects(Vector<boolean[]> inputNulls,
+      ArrayList<Object> newObj, IntermediateObject intObj, int joinPos) {
     Vector<boolean[]> resNulls = new Vector<boolean[]>();
     boolean newObjNull = newObj == dummyObj[joinPos] ? true : false;
-    if (joinPos == 0)
-    {
-      if (newObjNull) return null;
+    if (joinPos == 0) {
+      if (newObjNull)
+        return null;
       boolean[] nulls = new boolean[1];
       nulls[0] = newObjNull;
       resNulls.add(nulls);
       return resNulls;
     }
-    
+
     int left = condn[joinPos - 1].getLeft();
     int type = condn[joinPos - 1].getType();
-    
+
     // process all nulls for RIGHT and FULL OUTER JOINS
-    if (((type == joinDesc.RIGHT_OUTER_JOIN) || (type == joinDesc.FULL_OUTER_JOIN)) 
-        && !newObjNull && (inputNulls == null)) { 
+    if (((type == joinDesc.RIGHT_OUTER_JOIN) || (type == joinDesc.FULL_OUTER_JOIN))
+        && !newObjNull && (inputNulls == null)) {
       boolean[] newNulls = new boolean[intObj.getCurSize()];
       for (int i = 0; i < newNulls.length - 1; i++)
         newNulls[i] = true;
-      newNulls[newNulls.length-1] = false;
+      newNulls[newNulls.length - 1] = false;
       resNulls.add(newNulls);
       return resNulls;
     }
@@ -380,41 +432,45 @@
     if (inputNulls == null)
       return null;
 
-    if (type == joinDesc.INNER_JOIN) 
-      return joinObjectsInnerJoin(resNulls, inputNulls, newObj, intObj, left, newObjNull);
-    else if (type == joinDesc.LEFT_OUTER_JOIN) 
-      return joinObjectsLeftOuterJoin(resNulls, inputNulls, newObj, intObj, left, newObjNull);
-    else if (type == joinDesc.RIGHT_OUTER_JOIN) 
-      return joinObjectsRightOuterJoin(resNulls, inputNulls, newObj, intObj, left, newObjNull);
+    if (type == joinDesc.INNER_JOIN)
+      return joinObjectsInnerJoin(resNulls, inputNulls, newObj, intObj, left,
+          newObjNull);
+    else if (type == joinDesc.LEFT_OUTER_JOIN)
+      return joinObjectsLeftOuterJoin(resNulls, inputNulls, newObj, intObj,
+          left, newObjNull);
+    else if (type == joinDesc.RIGHT_OUTER_JOIN)
+      return joinObjectsRightOuterJoin(resNulls, inputNulls, newObj, intObj,
+          left, newObjNull);
     assert (type == joinDesc.FULL_OUTER_JOIN);
-    return joinObjectsFullOuterJoin(resNulls, inputNulls, newObj, intObj, left, newObjNull);
+    return joinObjectsFullOuterJoin(resNulls, inputNulls, newObj, intObj, left,
+        newObjNull);
   }
-  
-  /* 
-   * genObject is a recursive function. For the inputs, a array of
-   * bitvectors is maintained (inputNulls) where each entry denotes whether
-   * the element is to be used or not (whether it is null or not). The size of
-   * the bitvector is same as the number of inputs under consideration 
-   * currently. When all inputs are accounted for, the output is forwared
-   * appropriately.
+
+  /*
+   * genObject is a recursive function. For the inputs, a array of bitvectors is
+   * maintained (inputNulls) where each entry denotes whether the element is to
+   * be used or not (whether it is null or not). The size of the bitvector is
+   * same as the number of inputs under consideration currently. When all inputs
+   * are accounted for, the output is forwared appropriately.
    */
-  private void genObject(Vector<boolean[]> inputNulls, int aliasNum, IntermediateObject intObj) 
-    throws HiveException {
+  private void genObject(Vector<boolean[]> inputNulls, int aliasNum,
+      IntermediateObject intObj) throws HiveException {
     if (aliasNum < numValues) {
       Iterator<ArrayList<Object>> aliasRes = storage.get(order[aliasNum])
-        .iterator();
+          .iterator();
       iterators.push(aliasRes);
       while (aliasRes.hasNext()) {
         ArrayList<Object> newObj = aliasRes.next();
         intObj.pushObj(newObj);
-        Vector<boolean[]> newNulls = joinObjects(inputNulls, newObj, intObj, aliasNum);
+        Vector<boolean[]> newNulls = joinObjects(inputNulls, newObj, intObj,
+            aliasNum);
         genObject(newNulls, aliasNum + 1, intObj);
         intObj.popObj();
       }
       iterators.pop();
-    }
-    else {
-      if (inputNulls == null) return;
+    } else {
+      if (inputNulls == null)
+        return;
       Iterator<boolean[]> nullsIter = inputNulls.iterator();
       while (nullsIter.hasNext()) {
         boolean[] nullsVec = nullsIter.next();
@@ -429,29 +485,27 @@
    * @throws HiveException
    */
   public void endGroup() throws HiveException {
-    try {
-      LOG.trace("Join Op: endGroup called: numValues=" + numValues);
+    LOG.trace("Join Op: endGroup called: numValues=" + numValues);
+    checkAndGenObject();
+  }
 
-      // does any result need to be emitted
-      for (int i = 0; i < numValues; i++) {
-        Byte alias = order[i];
-        if (storage.get(alias).iterator().hasNext() == false) {
-          if (noOuterJoin) {
-            LOG.trace("No data for alias=" + i);
-            return;
-          } else {
-            storage.put(alias, dummyObjVectors[i]);
-          }
+  private void checkAndGenObject() throws HiveException {
+    // does any result need to be emitted
+    for (int i = 0; i < numValues; i++) {
+      Byte alias = order[i];
+      if (storage.get(alias).iterator().hasNext() == false) {
+        if (noOuterJoin) {
+          LOG.trace("No data for alias=" + i);
+          return;
+        } else {
+          storage.put(alias, dummyObjVectors[i]);
         }
       }
-
-      LOG.trace("calling genObject");
-      genObject(null, 0, new IntermediateObject(new ArrayList[numValues], 0));
-      LOG.trace("called genObject");
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new HiveException(e);
     }
+
+    LOG.trace("calling genObject");
+    genObject(null, 0, new IntermediateObject(new ArrayList[numValues], 0));
+    LOG.trace("called genObject");
   }
 
   /**
@@ -462,6 +516,5 @@
     LOG.trace("Join Op close");
     super.close(abort);
   }
-}
-
 
+}

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LimitOperator.java Mon Nov 10 17:50:06 2008
@@ -19,16 +19,18 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.*;
+import java.util.HashMap;
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.plan.limitDesc;
-import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.conf.Configuration;
 
 /**
  * Limit operator implementation
- * Limits a subobject and passes that on.
+ * Limits the number of rows to be passed on.
  **/
 public class LimitOperator extends Operator<limitDesc> implements Serializable {
   private static final long serialVersionUID = 1L;
@@ -50,4 +52,5 @@
     else
       setDone(true);
   }
+
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Mon Nov 10 17:50:06 2008
@@ -62,6 +62,11 @@
     
       String cmdLine = hadoopExec + " jar " + auxJars + " " + hiveJar + " org.apache.hadoop.hive.ql.exec.ExecDriver -plan " + planFile.toString() + " " + hiveConfArgs;
       
+      String files = ExecDriver.getRealFiles(conf);
+      if(!files.isEmpty()) {
+        cmdLine = cmdLine + " -files " + files;
+      }
+
       LOG.info("Executing: " + cmdLine);
       Process executor = Runtime.getRuntime().exec(cmdLine);
 

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Mon Nov 10 17:50:06 2008
@@ -20,8 +20,11 @@
 
 import java.io.Serializable;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -30,6 +33,9 @@
 import org.apache.hadoop.hive.ql.plan.loadTableDesc;
 import org.apache.hadoop.hive.ql.plan.moveWork;
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -108,6 +114,44 @@
         String mesg_detail = " from " + tbd.getSourceDir();
         console.printInfo(mesg, mesg_detail);
 
+        // Get the file format of the table
+        boolean tableIsSequenceFile = tbd.getTable().getInputFileFormatClass().equals(SequenceFileInputFormat.class);
+        // Get all files from the src directory
+        FileStatus [] dirs;
+        ArrayList<FileStatus> files;
+        try {
+          fs = FileSystem.get(db.getTable(tbd.getTable().getTableName()).getDataLocation(),
+              Hive.get().getConf());
+          dirs = fs.globStatus(new Path(tbd.getSourceDir()));
+          files = new ArrayList<FileStatus>();
+          for (int i=0; i<dirs.length; i++) {
+            files.addAll(Arrays.asList(fs.listStatus(dirs[i].getPath())));
+            // We only check one file, so exit the loop when we have at least one.
+            if (files.size()>0) break;
+          }
+        } catch (IOException e) {
+          throw new HiveException("addFiles: filesystem error in check phase", e);
+        }
+        // Check if the file format of the file matches that of the table.
+        if (files.size() > 0) {
+          int fileId = 0;
+          boolean fileIsSequenceFile = true;   
+          try {
+            SequenceFile.Reader reader = new SequenceFile.Reader(
+              fs, files.get(fileId).getPath(), conf);
+            reader.close();
+          } catch (IOException e) {
+            fileIsSequenceFile = false;
+          }
+          if (!fileIsSequenceFile && tableIsSequenceFile) {
+            throw new HiveException("Cannot load text files into a table stored as SequenceFile.");
+          }
+          if (fileIsSequenceFile && !tableIsSequenceFile) {
+            throw new HiveException("Cannot load SequenceFiles into a table stored as TextFile.");
+          }
+        }
+         
+
         if(tbd.getPartitionSpec().size() == 0) {
           db.loadTable(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(), tbd.getReplace());
         } else {

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Mon Nov 10 17:50:06 2008
@@ -22,6 +22,7 @@
 import java.io.*;
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -31,6 +32,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.plan.explain;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 /**
  * Base operator implementation
@@ -42,6 +45,7 @@
   private static final long serialVersionUID = 1L;
   
   protected List<Operator<? extends Serializable>> childOperators;
+  protected List<Operator<? extends Serializable>> parentOperators;
 
   public Operator() {}
 
@@ -53,6 +57,14 @@
     return childOperators;
   }
 
+  public void setParentOperators(List<Operator<? extends Serializable>> parentOperators) {
+    this.parentOperators = parentOperators;
+  }
+
+  public List<Operator<? extends Serializable>> getParentOperators() {
+    return parentOperators;
+  }
+
   protected String id;
   protected T conf;
   protected boolean done;
@@ -277,4 +289,22 @@
     }    
   }
 
+  public List<String> mergeColListsFromChildren(List<String> colList, 
+                                        HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) {
+    return colList;
+  }
+
+  public List<String> genColLists(HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx) 
+    throws SemanticException {
+    List<String> colList = new ArrayList<String>();
+    if (childOperators != null)
+      for(Operator<? extends Serializable> o: childOperators)
+        colList = Utilities.mergeUniqElems(colList, o.genColLists(opParseCtx));
+
+    List<String> cols = mergeColListsFromChildren(colList, opParseCtx);
+    OpParseContext ctx = opParseCtx.get(this);
+    ctx.setColNames(cols);
+    return cols;
+  }
+
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Mon Nov 10 17:50:06 2008
@@ -109,7 +109,14 @@
       children.add(ret);
       op.setChildOperators(children);
     }
+
+    // add parents for the newly created operator
+    List<Operator<? extends Serializable>> parent = new ArrayList<Operator<? extends Serializable>>();
+    for(Operator op: oplist)
+      parent.add(op);
     
+    ret.setParentOperators(parent);
+
     return (ret);
   }
 



Mime
View raw message