hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zs...@apache.org
Subject svn commit: r697291 [11/31] - in /hadoop/core/trunk: ./ src/contrib/hive/cli/src/java/org/apache/hadoop/hive/cli/ src/contrib/hive/metastore/if/ src/contrib/hive/metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/ src/contrib/hive/metastor...
Date Fri, 19 Sep 2008 23:56:35 GMT
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Fri Sep 19 16:56:30 2008
@@ -22,29 +22,43 @@
 import java.util.*;
 
 import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.io.WritableHiveObject;
-import org.apache.hadoop.hive.ql.io.WritableComparableHiveObject;
 
 
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.exec.ExecMapper.reportStats;
+import org.apache.hadoop.hive.serde2.ColumnSet;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.MetadataListStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
 
 public class ExecReducer extends MapReduceBase implements Reducer {
 
   private JobConf jc;
-  private OutputCollector oc;
-  private Operator reducer;
+  private OutputCollector<?,?> oc;
+  private Operator<?> reducer;
   private Reporter rp;
   private boolean abort = false;
   private boolean isTagged = false;
-  private final HiveObject [] tagObjects =  new HiveObject [Byte.MAX_VALUE];
 
   private static String [] fieldNames;
   public static final Log l4j = LogFactory.getLog("ExecReducer");
 
+  // TODO: move to DynamicSerDe when it's ready
+  private Deserializer inputKeyDeserializer;
+  // Input value serde needs to be an array to support different SerDe 
+  // for different tags
+  private SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
   static {
     ArrayList<String> fieldNameArray =  new ArrayList<String> ();
     for(Utilities.ReduceField r: Utilities.ReduceField.values()) {
@@ -53,20 +67,37 @@
     fieldNames = fieldNameArray.toArray(new String [0]);
   }
 
-
   public void configure(JobConf job) {
     jc = job;
     mapredWork gWork = Utilities.getMapRedWork(job);
     reducer = gWork.getReducer();
     reducer.setMapredWork(gWork);
     isTagged = gWork.getNeedsTagging();
-
-    // create a hive object to encapsulate each one of the potential tags      
-    for(int i=0; i<Byte.MAX_VALUE; i++) {
-      tagObjects[i] = new PrimitiveHiveObject(Byte.valueOf((byte)i));
-    }
+    try {
+      // We should initialize the SerDe with the TypeInfo when available.
+      tableDesc keyTableDesc = PlanUtils.getReduceKeyDesc(gWork);
+      inputKeyDeserializer = (SerDe)ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null);
+      inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
+      for(int tag=0; tag<Byte.MAX_VALUE; tag++) {
+        // We should initialize the SerDe with the TypeInfo when available.
+        tableDesc valueTableDesc = PlanUtils.getReduceValueDesc(gWork, tag);
+        inputValueDeserializer[tag] = (SerDe)ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), null);
+        inputValueDeserializer[tag].initialize(null, valueTableDesc.getProperties());
+      }
+    } catch (SerDeException e) {
+      throw new RuntimeException(e);
+    }    
   }
 
+  private Object keyObject;
+  private ObjectInspector keyObjectInspector;
+  private Object[] valueObject = new Object[Byte.MAX_VALUE];
+  private ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+  private ObjectInspector[] rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+  
+  private BytesWritable groupKey;
+  
+  ArrayList<Object> row = new ArrayList<Object>(3);
   public void reduce(Object key, Iterator values,
                      OutputCollector output,
                      Reporter reporter) throws IOException {
@@ -85,26 +116,65 @@
     }
 
     try {
-      // the key is either a WritableComparable or a NoTagWritableComparable
-      HiveObject keyObject = ((WritableComparableHiveObject)key).getHo();
-      //System.err.print(keyObject.toString());
-      // If a operator wants to do some work at the beginning of a group
-      reducer.startGroup();
-      while(values.hasNext()) {
-        WritableHiveObject who = (WritableHiveObject)values.next();
-       //System.err.print(who.getHo().toString());
-
-        LabeledCompositeHiveObject lho = new LabeledCompositeHiveObject(fieldNames);
-        lho.addHiveObject(keyObject);
-        lho.addHiveObject(who.getHo());
-        if(isTagged) {
-          lho.addHiveObject(tagObjects[who.getTag()]);
+      BytesWritable keyWritable = (BytesWritable)key;
+      byte tag = 0;
+      if (isTagged) {
+        // remove the tag
+        int size = keyWritable.getSize() - 1;
+        tag = keyWritable.get()[size]; 
+        keyWritable.setSize(size);
+      }
+      
+      if (!keyWritable.equals(groupKey)) {
+        // If a operator wants to do some work at the beginning of a group
+        if (groupKey == null) {
+          groupKey = new BytesWritable();
+        } else {
+          // If a operator wants to do some work at the end of a group
+          l4j.trace("End Group");
+          reducer.endGroup();
         }
-        reducer.process(lho);
+        groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
+        l4j.trace("Start Group");
+        reducer.startGroup();
+      }
+      try {
+        keyObject = inputKeyDeserializer.deserialize(keyWritable);
+      } catch (SerDeException e) {
+        throw new HiveException(e);
+      }
+      // This is a hack for generating the correct ObjectInspector.
+      // In the future, we should use DynamicSerde and initialize it using the type info. 
+      if (keyObjectInspector == null) {
+        // Directly create ObjectInspector here because we didn't know the number of cols till now.
+        keyObjectInspector = MetadataListStructObjectInspector.getInstance(((ColumnSet)keyObject).col.size()); 
+      }
+      // System.err.print(keyObject.toString());
+      while (values.hasNext()) {
+        Text valueText = (Text)values.next();
+        //System.err.print(who.getHo().toString());
+        try {
+          valueObject[tag] = inputValueDeserializer[tag].deserialize(valueText);
+        } catch (SerDeException e) {
+          throw new HiveException(e);
+        }
+        row.clear();
+        row.add(keyObject);
+        row.add(valueObject[tag]);
+        row.add(tag);
+        if (valueObjectInspector[tag] == null) {
+          // Directly create ObjectInspector here because we didn't know the number of cols till now.
+          valueObjectInspector[tag] = MetadataListStructObjectInspector.getInstance(((ColumnSet)valueObject[tag]).col.size());
+          ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+          ois.add(keyObjectInspector);
+          ois.add(valueObjectInspector[tag]);
+          ois.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(Byte.class));
+          rowObjectInspector[tag] = ObjectInspectorFactory.getStandardStructObjectInspector(
+              Arrays.asList(fieldNames), ois);
+        }
+        reducer.process(row, rowObjectInspector[tag]);
       }
 
-      // If a operator wants to do some work at the end of a group
-      reducer.endGroup();
 
     } catch (HiveException e) {
       abort = true;
@@ -114,6 +184,11 @@
 
   public void close() {
     try {
+      if (groupKey != null) {
+        // If a operator wants to do some work at the end of a group
+        l4j.trace("End Group");
+        reducer.endGroup();
+      }
       reducer.close(abort);
       reportStats rps = new reportStats (rp);
       reducer.preorderMap(rps);

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Fri Sep 19 16:56:30 2008
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.FileOutputStream;
+import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.Serializable;
 import java.lang.annotation.Annotation;
@@ -28,6 +29,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.ql.plan.explain;
 import org.apache.hadoop.hive.ql.plan.explainWork;
 import org.apache.hadoop.util.StringUtils;
@@ -43,9 +45,9 @@
   public int execute() {
     
     try {
-      // If this is an explain plan then return from here
-      PrintStream out = new PrintStream(new FileOutputStream(work.getResFile()));
-
+    	OutputStream outS = FileSystem.get(conf).create(work.getResFile());
+    	PrintStream out = new PrintStream(outS);
+    	
       // Print out the parse AST
       outputAST(work.getAstStringTree(), out, 0);
       out.println();
@@ -55,7 +57,8 @@
       
       // Go over all the tasks and dump out the plans
       outputStagePlans(out, work.getRootTasks(), 0);
-
+      out.close();
+      
       return (0);
     }
     catch (Exception e) {

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeColumnEvaluator.java Fri Sep 19 16:56:30 2008
@@ -20,25 +20,59 @@
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.exprNodeColumnDesc;
-import org.apache.hadoop.hive.serde.SerDeField;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
+/**
+ * This class support multi-level fields like "a.b.c" for historical reasons.
+ */
 public class ExprNodeColumnEvaluator extends ExprNodeEvaluator {
 
   protected exprNodeColumnDesc expr;
-  transient SerDeField field;  
+  transient StructObjectInspector cachedRowInspector;
+  transient String[] fieldNames;
+  transient StructField[] fields;
+  transient ObjectInspector[] fieldsObjectInspector;
   
   public ExprNodeColumnEvaluator(exprNodeColumnDesc expr) {
     this.expr = expr;
   }
 
-  public Object evaluateToObject(HiveObject row)  throws HiveException {
-    return evaluate(row).getJavaObject();
+  public void evaluate(Object row, ObjectInspector rowInspector,
+      InspectableObject result) throws HiveException {
+    
+    assert(result != null);
+    // If this is the first row, or the dynamic structure of this row 
+    // is different from the previous row 
+    if (fields == null || cachedRowInspector != rowInspector) {
+      evaluateInspector(rowInspector);
+    }
+    result.o = cachedRowInspector.getStructFieldData(row, fields[0]);
+    for(int i=1; i<fields.length; i++) {
+      result.o = ((StructObjectInspector)fieldsObjectInspector[i-1]).getStructFieldData(
+          result.o, fields[i]);
+    }
+    result.oi = fieldsObjectInspector[fieldsObjectInspector.length - 1];
   }
 
-  public HiveObject evaluate(HiveObject row) throws HiveException {
-    if (field == null) {
-      field = row.getFieldFromExpression(expr.getColumn());
+  public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+      throws HiveException {
+    
+    if (fields == null || cachedRowInspector != rowInspector) {
+      cachedRowInspector = (StructObjectInspector)rowInspector;
+      fieldNames = expr.getColumn().split("\\.", -1);
+      fields = new StructField[fieldNames.length];
+      fieldsObjectInspector = new ObjectInspector[fieldNames.length];
+      fields[0] = cachedRowInspector.getStructFieldRef(fieldNames[0]);
+      fieldsObjectInspector[0] = fields[0].getFieldObjectInspector();
+      for (int i=1; i<fieldNames.length; i++) {
+        fields[i] = ((StructObjectInspector)fieldsObjectInspector[i-1]).getStructFieldRef(fieldNames[i]);
+        fieldsObjectInspector[i] = fields[i].getFieldObjectInspector();
+      }
     }
-    return row.get(field);
+    return fieldsObjectInspector[fieldsObjectInspector.length - 1];
   }
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeConstantEvaluator.java Fri Sep 19 16:56:30 2008
@@ -20,20 +20,30 @@
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.exprNodeConstantDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 
 public class ExprNodeConstantEvaluator extends ExprNodeEvaluator {
 
   protected exprNodeConstantDesc expr;
-
+  transient ObjectInspector objectInspector;
+  
   public ExprNodeConstantEvaluator(exprNodeConstantDesc expr) {
     this.expr = expr;
+    objectInspector = ObjectInspectorFactory.getStandardPrimitiveObjectInspector(expr.getTypeInfo().getPrimitiveClass());
   }
 
-  public Object evaluateToObject(HiveObject row)  throws HiveException {
-    return expr.getValue();
+  public void evaluate(Object row, ObjectInspector rowInspector,
+      InspectableObject result) throws HiveException {
+    assert(result != null);
+    result.o = expr.getValue();
+    result.oi = objectInspector;
   }
 
-  public HiveObject evaluate(HiveObject r) throws HiveException {
-    return new PrimitiveHiveObject(evaluateToObject(r));
+  public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+      throws HiveException {
+    return objectInspector;
   }
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeEvaluator.java Fri Sep 19 16:56:30 2008
@@ -19,16 +19,21 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 public abstract class ExprNodeEvaluator {
+
   /**
-   * @return plain old java object
-   **/
-  public abstract Object evaluateToObject(HiveObject row) throws HiveException;
+   * Evaluate the expression given the row and rowInspector. 
+   * @param result   result.o and result.oi will be set inside the method.
+   */
+  public abstract void evaluate(Object row, ObjectInspector rowInspector, InspectableObject result) throws HiveException;
 
   /**
-   * @return encapsulated Hive Object
-   **/
-  public abstract HiveObject evaluate(HiveObject row) throws HiveException;
-  
+   * Metadata evaluation. Return the inspector for the expression, given the rowInspector.
+   * This method must return the same value as result.oi in evaluate(...) call with the same rowInspector.   
+   */
+  public abstract ObjectInspector evaluateInspector(ObjectInspector rowInspector) throws HiveException;
+
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFieldEvaluator.java Fri Sep 19 16:56:30 2008
@@ -20,29 +20,59 @@
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.exprNodeFieldDesc;
-import org.apache.hadoop.hive.serde.SerDeField;
+
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 public class ExprNodeFieldEvaluator extends ExprNodeEvaluator {
 
   protected exprNodeFieldDesc desc;
-  transient ExprNodeEvaluator evaluator;
-  transient SerDeField field;
+  transient ExprNodeEvaluator leftEvaluator;
+  transient InspectableObject leftInspectableObject;
+  transient StructObjectInspector cachedLeftObjectInspector;
+  transient StructField field;
+  transient ObjectInspector fieldObjectInspector;
   
   public ExprNodeFieldEvaluator(exprNodeFieldDesc desc) {
     this.desc = desc;
-    evaluator = ExprNodeEvaluatorFactory.get(desc.getDesc());
+    leftEvaluator = ExprNodeEvaluatorFactory.get(desc.getDesc());
+    field = null;
+    leftInspectableObject = new InspectableObject();
   }
 
+  public void evaluate(Object row, ObjectInspector rowInspector,
+      InspectableObject result) throws HiveException {
+    
+    assert(result != null);
+    // Get the result in leftInspectableObject
+    leftEvaluator.evaluate(row, rowInspector, leftInspectableObject);
 
-  public Object evaluateToObject(HiveObject row)  throws HiveException {
-    return evaluate(row).getJavaObject();
+    if (field == null) {
+      cachedLeftObjectInspector = (StructObjectInspector)leftInspectableObject.oi;
+      field = cachedLeftObjectInspector.getStructFieldRef(desc.getFieldName());
+      fieldObjectInspector = field.getFieldObjectInspector();
+    } else {
+      assert(cachedLeftObjectInspector == leftInspectableObject.oi);
+    }
+    result.oi = fieldObjectInspector;
+    result.o = cachedLeftObjectInspector.getStructFieldData(leftInspectableObject.o, field); 
   }
 
-  public HiveObject evaluate(HiveObject row) throws HiveException {
-    HiveObject ho = evaluator.evaluate(row);
+  public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+      throws HiveException {
+    // If this is the first row, or the dynamic structure of the evaluatorInspectableObject 
+    // is different from the previous row 
+    leftInspectableObject.oi = leftEvaluator.evaluateInspector(rowInspector);
     if (field == null) {
-      field = ho.getFieldFromExpression(desc.getFieldName());
+      cachedLeftObjectInspector = (StructObjectInspector)leftInspectableObject.oi;
+      field = cachedLeftObjectInspector.getStructFieldRef(desc.getFieldName());
+      fieldObjectInspector = field.getFieldObjectInspector();
+    } else {
+      assert(cachedLeftObjectInspector == leftInspectableObject.oi);      
     }
-    return ho.get(field);
+    return fieldObjectInspector;
   }
+
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeFuncEvaluator.java Fri Sep 19 16:56:30 2008
@@ -18,13 +18,16 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.util.ArrayList;
+import java.lang.reflect.Method;
 import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.exprNodeFuncDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.util.ReflectionUtils;
 
 public class ExprNodeFuncEvaluator extends ExprNodeEvaluator {
@@ -32,41 +35,60 @@
   private static final Log LOG = LogFactory.getLog(ExprNodeFuncEvaluator.class.getName());
   
   protected exprNodeFuncDesc expr;
-  transient ArrayList<ExprNodeEvaluator> evaluators;
-  transient Object[] children;
+  transient ExprNodeEvaluator[] paramEvaluators;
+  transient InspectableObject[] paramInspectableObjects;
+  transient Object[] paramValues;
   transient UDF udf;
+  transient Method udfMethod;
+  transient ObjectInspector outputObjectInspector;
   
   public ExprNodeFuncEvaluator(exprNodeFuncDesc expr) {
     this.expr = expr;
     assert(expr != null);
     Class<?> c = expr.getUDFClass();
-    LOG.info(c.toString());
+    udfMethod = expr.getUDFMethod();
+    LOG.debug(c.toString());
+    LOG.debug(udfMethod.toString());
     udf = (UDF)ReflectionUtils.newInstance(expr.getUDFClass(), null);
-    evaluators = new ArrayList<ExprNodeEvaluator>();
-    for(int i=0; i<expr.getChildren().size(); i++) {
-      evaluators.add(ExprNodeEvaluatorFactory.get(expr.getChildren().get(i)));
+    int paramNumber = expr.getChildren().size();
+    paramEvaluators = new ExprNodeEvaluator[paramNumber];
+    paramInspectableObjects  = new InspectableObject[paramNumber];
+    for(int i=0; i<paramNumber; i++) {
+      paramEvaluators[i] = ExprNodeEvaluatorFactory.get(expr.getChildren().get(i));
+      paramInspectableObjects[i] = new InspectableObject();
     }
-    children = new Object[expr.getChildren().size()];  
+    paramValues = new Object[expr.getChildren().size()];
+    outputObjectInspector = ObjectInspectorFactory.getStandardPrimitiveObjectInspector(
+        udfMethod.getReturnType());
   }
 
-  public Object evaluateToObject(HiveObject row)  throws HiveException {
+  public void evaluate(Object row, ObjectInspector rowInspector,
+      InspectableObject result) throws HiveException {
+    if (result == null) {
+      throw new HiveException("result cannot be null.");
+    }
     // Evaluate all children first
-    for(int i=0; i<evaluators.size(); i++) {
-      Object o = evaluators.get(i).evaluateToObject(row);
-      children[i] = o;
+    for(int i=0; i<paramEvaluators.length; i++) {
+      paramEvaluators[i].evaluate(row, rowInspector, paramInspectableObjects[i]);
+      paramValues[i] = paramInspectableObjects[i].o;
     }
     try {
-      return expr.getUDFMethod().invoke(udf, children);
+      result.o = udfMethod.invoke(udf, paramValues);
+      result.oi = outputObjectInspector;
     } catch (Exception e) {
-      throw new HiveException("Unable to execute UDF function " + udf.getClass() + " " 
-          + expr.getUDFMethod() + " on inputs " + "(" + children.length + ") " + Arrays.asList(children) + ": " + e.getMessage(), e);
+      if (e instanceof HiveException) {
+        throw (HiveException)e;
+      } else if (e instanceof RuntimeException) {
+        throw (RuntimeException)e;
+      } else {
+        throw new HiveException("Unable to execute UDF function " + udf.getClass() + " " 
+          + udfMethod + " on inputs " + "(" + paramValues.length + ") " + Arrays.asList(paramValues) + ": " + e.getMessage(), e);
+      }
     }
   }
 
-  public HiveObject evaluate(HiveObject row) throws HiveException {
-    Object obj = evaluateToObject(row);
-    if (obj == null)
-      return new NullHiveObject();
-    return new PrimitiveHiveObject(obj);
+  public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+      throws HiveException {
+    return outputObjectInspector;
   }
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeIndexEvaluator.java Fri Sep 19 16:56:30 2008
@@ -20,14 +20,17 @@
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.exprNodeIndexDesc;
-import org.apache.hadoop.hive.serde.SerDeField;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 public class ExprNodeIndexEvaluator extends ExprNodeEvaluator {
 
   protected exprNodeIndexDesc expr;
   transient ExprNodeEvaluator mainEvaluator;
+  transient InspectableObject mainInspectableObject = new InspectableObject();
   transient ExprNodeEvaluator indexEvaluator;
-  transient SerDeField field;
+  transient InspectableObject indexInspectableObject = new InspectableObject();
   
   public ExprNodeIndexEvaluator(exprNodeIndexDesc expr) {
     this.expr = expr;
@@ -35,18 +38,22 @@
     indexEvaluator = ExprNodeEvaluatorFactory.get(expr.getIndex());
   }
 
-  public Object evaluateToObject(HiveObject row)  throws HiveException {
-    return evaluate(row).getJavaObject();
+  public void evaluate(Object row, ObjectInspector rowInspector,
+      InspectableObject result) throws HiveException {
+    
+    assert(result != null);
+    mainEvaluator.evaluate(row, rowInspector, mainInspectableObject);
+    indexEvaluator.evaluate(row, rowInspector, indexInspectableObject);
+    int index = ((Number)indexInspectableObject.o).intValue();
+    
+    ListObjectInspector loi = (ListObjectInspector)mainInspectableObject.oi;
+    result.oi = loi.getListElementObjectInspector();
+    result.o = loi.getListElement(mainInspectableObject.o, index);
   }
 
-  public HiveObject evaluate(HiveObject row) throws HiveException {
-    HiveObject ho = mainEvaluator.evaluate(row);
-    if (field == null || !(indexEvaluator instanceof ExprNodeConstantEvaluator)) {
-      // TODO: This optimization is wrong because of the field implementation inside HiveObject.
-      // The problem is that at the second "[" (after "c"), "field" caches both "index1" and 
-      // "index2" in "a.b[index1].c[index2]", while it should only cache "index2".
-      field = ho.getFieldFromExpression("[" + indexEvaluator.evaluateToObject(row) + "]");
-    }
-    return ho.get(field);
+  public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+      throws HiveException {
+    return ((ListObjectInspector)mainEvaluator.evaluateInspector(rowInspector)).getListElementObjectInspector();
   }
+
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeNullEvaluator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeNullEvaluator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeNullEvaluator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExprNodeNullEvaluator.java Fri Sep 19 16:56:30 2008
@@ -20,6 +20,8 @@
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.exprNodeNullDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 // This function will not be used currently, since the function expressions
 // change the void to the first matching argument
@@ -31,11 +33,14 @@
     this.expr = expr;
   }
 
-  public Object evaluateToObject(HiveObject row)  throws HiveException {
-    return expr.getValue();
+  public void evaluate(Object row, ObjectInspector rowInspector,
+      InspectableObject result) throws HiveException {
+    throw new HiveException("Hive 2 Internal exception: should not reach here.");
   }
 
-  public HiveObject evaluate(HiveObject r) throws HiveException {
-    return new NullHiveObject();
+  public ObjectInspector evaluateInspector(ObjectInspector rowInspector)
+      throws HiveException {
+    throw new HiveException("Hive 2 Internal exception: should not reach here.");
   }
+
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Fri Sep 19 16:56:30 2008
@@ -22,6 +22,8 @@
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.extractDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -31,13 +33,15 @@
 public class ExtractOperator extends Operator<extractDesc> implements Serializable {
   private static final long serialVersionUID = 1L;
   transient protected ExprNodeEvaluator eval;
+  transient protected InspectableObject result = new InspectableObject();
 
   public void initialize(Configuration hconf) throws HiveException {
     super.initialize(hconf);
     eval = ExprNodeEvaluatorFactory.get(conf.getCol());
   }
 
-  public void process(HiveObject r) throws HiveException {
-    forward (eval.evaluate(r));
+  public void process(Object row, ObjectInspector rowInspector) throws HiveException {
+    eval.evaluate(row, rowInspector, result);
+    forward(result.o, result.oi);
   }
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Fri Sep 19 16:56:30 2008
@@ -23,16 +23,14 @@
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.mapred.*;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.util.*;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.fileSinkDesc;
 import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
-import org.apache.hadoop.hive.serde.SerDe;
-import org.apache.hadoop.hive.serde.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.Serializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 /**
  * File Sink operator implementation
@@ -49,7 +47,7 @@
   transient protected FileSystem fs;
   transient protected Path outPath;
   transient protected Path finalPath;
-  transient protected SerDe serDe;
+  transient protected Serializer serializer;
   transient protected BytesWritable commonKey = new BytesWritable();
   
   private void commit() throws IOException {
@@ -79,13 +77,31 @@
   public void initialize(Configuration hconf) throws HiveException {
     super.initialize(hconf);
     try {
+      serializer = (Serializer)conf.getTableInfo().getDeserializerClass().newInstance();
+      serializer.initialize(null, conf.getTableInfo().getProperties());
+      
+      JobConf jc;
+      if(hconf instanceof JobConf) {
+        jc = (JobConf)hconf;
+      } else {
+        // test code path
+        jc = new JobConf(hconf, ExecDriver.class);
+      }
+
       fs = FileSystem.get(hconf);
       finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf));
       outPath = new Path(conf.getDirName(), "_tmp."+Utilities.getTaskId(hconf));
-      OutputFormat outputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+      OutputFormat<?, ?> outputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+      final Class<? extends Writable> outputClass = serializer.getSerializedClass();
+      boolean isCompressed = FileOutputFormat.getCompressOutput(jc);
 
+      // The reason to keep these instead of using OutputFormat.getRecordWriter() is that
+      // getRecordWriter does not give us enough control over the file name that we create.
       if(outputFormat instanceof IgnoreKeyTextOutputFormat) {
-        final FSDataOutputStream outStream = fs.create(outPath);
+        if(isCompressed) {
+          finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf) + ".gz");
+        }
+        final OutputStream outStream = Utilities.createCompressedStream(jc, fs.create(outPath));
         outWriter = new RecordWriter () {
             public void write(Writable r) throws IOException {
               Text tr = (Text)r;
@@ -98,7 +114,7 @@
           };
       } else if (outputFormat instanceof SequenceFileOutputFormat) {
         final SequenceFile.Writer outStream =
-          SequenceFile.createWriter(fs, hconf, outPath, BytesWritable.class, Text.class);
+            Utilities.createSequenceWriter(jc, fs, outPath, BytesWritable.class, outputClass);
         outWriter = new RecordWriter () {
             public void write(Writable r) throws IOException {
               outStream.append(commonKey, r);
@@ -109,20 +125,22 @@
           };
       } else {
         // should never come here - we should be catching this in ddl command
-        assert(false);
+        throw new HiveException ("Illegal outputformat: " + outputFormat.getClass().getName());
       }
-      serDe = conf.getTableInfo().getSerdeClass().newInstance();
+    } catch (HiveException e) {
+      throw e;
     } catch (Exception e) {
       e.printStackTrace();
       throw new HiveException(e);
     }
   }
 
-  public void process(HiveObject r) throws HiveException {
+  Writable recordValue; 
+  public void process(Object row, ObjectInspector rowInspector) throws HiveException {
     try {
       // user SerDe to serialize r, and write it out
-      Writable value = serDe.serialize(r.getJavaObject());
-      outWriter.write(value);
+      recordValue = serializer.serialize(row, rowInspector);
+      outWriter.write(recordValue);
     } catch (IOException e) {
       throw new HiveException (e);
     } catch (SerDeException e) {

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Fri Sep 19 16:56:30 2008
@@ -23,6 +23,8 @@
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.hive.ql.plan.filterDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -33,18 +35,20 @@
   private static final long serialVersionUID = 1L;
   public static enum Counter {FILTERED, PASSED}
   transient private final LongWritable filtered_count, passed_count;
-  transient private ExprNodeEvaluator eval;
-
+  transient private ExprNodeEvaluator conditionEvaluator;
+  transient private InspectableObject conditionInspectableObject;  
+  
   public FilterOperator () {
     super();
     filtered_count = new LongWritable();
     passed_count = new LongWritable();
+    conditionInspectableObject = new InspectableObject();
   }
 
   public void initialize(Configuration hconf) throws HiveException {
     super.initialize(hconf);
     try {
-      this.eval = ExprNodeEvaluatorFactory.get(conf.getPredicate());
+      this.conditionEvaluator = ExprNodeEvaluatorFactory.get(conf.getPredicate());
       statsMap.put(Counter.FILTERED, filtered_count);
       statsMap.put(Counter.PASSED, passed_count);
     } catch (Throwable e) {
@@ -53,11 +57,12 @@
     }
   }
 
-  public void process(HiveObject r) throws HiveException {
+  public void process(Object row, ObjectInspector rowInspector) throws HiveException {
     try {
-      Boolean ret = (Boolean)(eval.evaluateToObject(r));
+      conditionEvaluator.evaluate(row, rowInspector, conditionInspectableObject);
+      Boolean ret = (Boolean)(conditionInspectableObject.o);
       if (Boolean.TRUE.equals(ret)) {
-        forward(r);
+        forward(row, rowInspector);
         passed_count.set(passed_count.get()+1);
       } else {
         filtered_count.set(filtered_count.get()+1);
@@ -65,9 +70,7 @@
     } catch (ClassCastException e) {
       e.printStackTrace();
       throw new HiveException("Non Boolean return Object type: " +
-                              eval.evaluateToObject(r).getClass().getName());
-    } catch (NullPointerException e) {
-      throw new HiveException("NullPointerException in FilterOperator ", e);
+          conditionInspectableObject.o.getClass().getName());
     }
   }
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java Fri Sep 19 16:56:30 2008
@@ -22,6 +22,7 @@
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.forwardDesc;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -35,7 +36,9 @@
     // nothing to do really ..
   }
 
-  public void process(HiveObject r) throws HiveException {
-    forward(r);
+  @Override
+  public void process(Object row, ObjectInspector rowInspector)
+      throws HiveException {
+    forward(row, rowInspector);    
   }
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Fri Sep 19 16:56:30 2008
@@ -29,8 +29,8 @@
 import java.lang.Void;
 
 import org.apache.hadoop.hive.ql.exec.FunctionInfo.OperatorType;
-import org.apache.hadoop.hive.ql.parse.TypeInfo;
 import org.apache.hadoop.hive.ql.udf.*;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 
 public class FunctionRegistry {
 
@@ -252,7 +252,7 @@
 
         for(int i=0; i<argumentClasses.size() && match; i++) {
           if (argumentClasses.get(i) == Void.class) continue;
-          Class<?> accepted = TypeInfo.generalizePrimitive(argumentTypeInfos[i]);
+          Class<?> accepted = ObjectInspectorUtils.generalizePrimitive(argumentTypeInfos[i]);
           if (accepted.isAssignableFrom(argumentClasses.get(i))) {
             // do nothing if match
           } else if (!exact && implicitConvertable(argumentClasses.get(i), accepted)) {

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Sep 19 16:56:30 2008
@@ -28,7 +28,9 @@
 import org.apache.hadoop.hive.ql.plan.aggregationDesc;
 import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.groupByDesc;
-import org.apache.hadoop.hive.serde.SerDeField;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.conf.Configuration;
 
 /**
@@ -47,15 +49,18 @@
   transient protected Method[] aggregationsAggregateMethods;
   transient protected Method[] aggregationsEvaluateMethods;
 
-  transient protected List<SerDeField> choKeyFields;
+  transient protected ArrayList<ObjectInspector> objectInspectors;
+  transient protected ObjectInspector outputObjectInspector;
 
   // Used by sort-based GroupBy: Mode = COMPLETE, PARTIAL1, PARTIAL2
-  transient protected CompositeHiveObject currentKeys;
+  transient protected ArrayList<Object> currentKeys;
   transient protected UDAF[] aggregations;
   transient protected Object[][] aggregationsParametersLastInvoke;
 
   // Used by hash-based GroupBy: Mode = HASH
-  transient protected HashMap<CompositeHiveObject, UDAF[]> hashAggregations;
+  transient protected HashMap<ArrayList<Object>, UDAF[]> hashAggregations;
+  
+  transient boolean firstRow;
   
   public void initialize(Configuration hconf) throws HiveException {
     super.initialize(hconf);
@@ -124,8 +129,20 @@
         aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
         aggregations = newAggregations();
       } else {
-        hashAggregations = new HashMap<CompositeHiveObject, UDAF[]>();
+        hashAggregations = new HashMap<ArrayList<Object>, UDAF[]>();
+      }
+      // init objectInspectors
+      int totalFields = keyFields.length + aggregationClasses.length;
+      objectInspectors = new ArrayList<ObjectInspector>(totalFields);
+      for(int i=0; i<keyFields.length; i++) {
+        objectInspectors.add(null);
       }
+      for(int i=0; i<aggregationClasses.length; i++) {
+        objectInspectors.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(
+            aggregationsEvaluateMethods[i].getReturnType()));
+      }
+      
+      firstRow = true;
     } catch (Exception e) {
       e.printStackTrace();
       throw new RuntimeException(e);
@@ -141,12 +158,15 @@
     return aggs;
   }
 
-  protected void updateAggregations(UDAF[] aggs, HiveObject row, Object[][] lastInvoke) throws Exception {
+  InspectableObject tempInspectableObject = new InspectableObject();
+  
+  protected void updateAggregations(UDAF[] aggs, Object row, ObjectInspector rowInspector, Object[][] lastInvoke) throws Exception {
     for(int ai=0; ai<aggs.length; ai++) {
       // Calculate the parameters 
       Object[] o = new Object[aggregationParameterFields[ai].length];
       for(int pi=0; pi<aggregationParameterFields[ai].length; pi++) {
-        o[pi] = aggregationParameterFields[ai][pi].evaluateToObject(row);
+        aggregationParameterFields[ai][pi].evaluate(row, rowInspector, tempInspectableObject);
+        o[pi] = tempInspectableObject.o; 
       }
       // Update the aggregations.
       if (aggregationIsDistinct[ai] && lastInvoke != null) {
@@ -170,36 +190,48 @@
     }
   }
   
-  public void process(HiveObject row) throws HiveException {
+  public void process(Object row, ObjectInspector rowInspector) throws HiveException {
     
     try {
       // Compute the keys
-      ArrayList<HiveObject> keys = new ArrayList<HiveObject>(keyFields.length);
+      ArrayList<Object> newKeys = new ArrayList<Object>(keyFields.length);
       for (int i = 0; i < keyFields.length; i++) {
-        keys.add(keyFields[i].evaluate(row));
+        keyFields[i].evaluate(row, rowInspector, tempInspectableObject);
+        newKeys.add(tempInspectableObject.o);
+        if (firstRow) {
+          objectInspectors.set(i, tempInspectableObject.oi);
+        }
+      }
+      if (firstRow) {
+        firstRow = false;
+        ArrayList<String> fieldNames = new ArrayList<String>(objectInspectors.size());
+        for(int i=0; i<objectInspectors.size(); i++) {
+          fieldNames.add(Integer.valueOf(i).toString());
+        }
+        outputObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+          fieldNames, objectInspectors);
       }
-      CompositeHiveObject newKeys = new CompositeHiveObject(keys); 
-      
       // Prepare aggs for updating
       UDAF[] aggs = null;
       Object[][] lastInvoke = null;
       if (aggregations != null) {
         // sort-based aggregation
         // Need to forward?
-        if (currentKeys != null && !newKeys.equals(currentKeys)) {
-            forward(currentKeys, aggregations);
+        boolean keysAreEqual = newKeys.equals(currentKeys);
+        if (currentKeys != null && !keysAreEqual) {
+          forward(currentKeys, aggregations);
         }
         // Need to update the keys?
-        if (currentKeys == null || !newKeys.equals(currentKeys)) {
-            currentKeys = newKeys;
-            // init aggregations
-            for(UDAF aggregation: aggregations) {
-                aggregation.init();
-            }
-            // clear parameters in last-invoke
-            for(int i=0; i<aggregationsParametersLastInvoke.length; i++) {
-              aggregationsParametersLastInvoke[i] = null;
-            }
+        if (currentKeys == null || !keysAreEqual) {
+          currentKeys = newKeys;
+          // init aggregations
+          for(UDAF aggregation: aggregations) {
+            aggregation.init();
+          }
+          // clear parameters in last-invoke
+          for(int i=0; i<aggregationsParametersLastInvoke.length; i++) {
+            aggregationsParametersLastInvoke[i] = null;
+          }
         }
         aggs = aggregations;
         lastInvoke = aggregationsParametersLastInvoke;
@@ -215,7 +247,7 @@
       }
 
       // Update the aggs
-      updateAggregations(aggs, row, lastInvoke);
+      updateAggregations(aggs, row, rowInspector, lastInvoke);
 
     } catch (Exception e) {
       e.printStackTrace();
@@ -230,23 +262,16 @@
    *          The keys in the record
    * @throws HiveException
    */
-  protected void forward(CompositeHiveObject keys, UDAF[] aggs) throws Exception {
-    if (choKeyFields == null) {
-      // init choKeyFields
-      choKeyFields = new ArrayList<SerDeField>();
-      for (int i = 0; i < keyFields.length; i++) {
-        choKeyFields.add(keys.getFieldFromExpression(Integer.valueOf(i).toString()));
-      }
+  protected void forward(ArrayList<Object> keys, UDAF[] aggs) throws Exception {
+    int totalFields = keys.size() + aggs.length;
+    List<Object> a = new ArrayList<Object>(totalFields);
+    for(int i=0; i<keys.size(); i++) {
+      a.add(keys.get(i));
     }
-    int totalFields = keys.width + aggs.length;
-    CompositeHiveObject cho = new CompositeHiveObject(totalFields);
-    for (int i = 0; i < keys.width; i++) {
-      cho.addHiveObject(keys.get(choKeyFields.get(i)));
+    for(int i=0; i<aggs.length; i++) {
+      a.add(aggregationsEvaluateMethods[i].invoke(aggs[i]));
     }
-    for (int i = 0; i < aggs.length; i++) {
-      cho.addHiveObject(new PrimitiveHiveObject(aggregationsEvaluateMethods[i].invoke(aggs[i])));
-    }
-    forward(cho);
+    forward(a, outputObjectInspector);
   }
   
   /**
@@ -263,7 +288,7 @@
           }
         } else if (hashAggregations != null) {
           // hash-based aggregations
-          for (CompositeHiveObject key: hashAggregations.keySet()) {
+          for (ArrayList<Object> key: hashAggregations.keySet()) {
             forward(key, hashAggregations.get(key));
           }
         } else {
@@ -278,4 +303,5 @@
     }
     super.close(abort);
   }
+
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveObject.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveObject.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveObject.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/HiveObject.java Fri Sep 19 16:56:30 2008
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.hadoop.hive.serde.*;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.io.*;
-import org.apache.hadoop.hive.utils.ByteStream;
-
-/**
- * Data for each row is passed around as HiveObjects in Hive
- */
-
-public abstract class HiveObject {
-
-  protected Object javaObject;
-
-  protected boolean isNull;
-
-  /**
-   * @param expr a well formed expression nesting within this Hive Object
-   * @return field handler that can be used in a subsequent get() call
-   */
-  public abstract SerDeField getFieldFromExpression(String expr) throws HiveException;
-
-  /**
-   * @param field obtained using call to getFieldFromExpression
-   * @return another subObject
-   */
-  public abstract HiveObject get(SerDeField field) throws HiveException;
-
-  /**
-   * @return get the current HiveObject as a Java Object
-   */
-  public Object getJavaObject() throws HiveException {
-    if (isNull) return null;
-    return javaObject;
-  }
-
-  /**
-   * @return get isNull
-   */
-  public boolean getIsNull() {
-    return isNull;
-  }
-
-  public void setIsNull(boolean isNull) {
-    this.isNull = isNull;
-  }
-
-  /**
-   * @return list of top level fields in this Hive Object
-   */
-  public abstract List<SerDeField> getFields() throws HiveException;
-
-  /**
-   * Used to detect base case of object hierarchy
-   * @return true if the Object encapsulates a Hive Primitive Object. False otherwise
-   */
-  public abstract boolean isPrimitive();
-
-  public abstract int hashCode();
-  
-  public abstract boolean equals(Object other);
-  
-  public String toString () {
-    try {
-      HiveObjectSerializer hos = new NaiiveSerializer();
-      ByteStream.Output bos = new ByteStream.Output ();
-      hos.serialize(this, new DataOutputStream(bos));
-      return new String(bos.getData(), 0, bos.getCount(), "UTF-8");
-    } catch (Exception e) {
-      return ("Exception:  "+e.getMessage());
-    }
-  }
-
-  public static final ArrayList<SerDeField> nlist = new ArrayList<SerDeField> (0);
-}

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Fri Sep 19 16:56:30 2008
@@ -22,7 +22,6 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 import java.util.Vector;
@@ -32,8 +31,11 @@
 import org.apache.hadoop.hive.ql.plan.exprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.joinCond;
 import org.apache.hadoop.hive.ql.plan.joinDesc;
-import org.apache.hadoop.hive.serde.SerDeField;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 
 /**
  * Join operator implementation.
@@ -43,35 +45,29 @@
   // a list of value expressions for each alias are maintained 
   public static class JoinExprMap {
     ExprNodeEvaluator[] valueFields;
-    List<SerDeField> listFields;
 
-    public JoinExprMap(ExprNodeEvaluator[] valueFields,
-        List<SerDeField> listFields) {
+    public JoinExprMap(ExprNodeEvaluator[] valueFields) {
       this.valueFields = valueFields;
-      this.listFields = listFields;
     }
 
     public ExprNodeEvaluator[] getValueFields() {
       return valueFields;
     }
 
-    public List<SerDeField> getListFields() {
-      return listFields;
-    }
   }
 
   public static class IntermediateObject{
-    CompositeHiveObject[] objs;
+    ArrayList<Object>[] objs;
     int curSize;
 
-    public IntermediateObject(CompositeHiveObject[] objs, int curSize) {
+    public IntermediateObject(ArrayList<Object>[] objs, int curSize) {
       this.objs  = objs;
       this.curSize = curSize;
     }
 
-    public CompositeHiveObject[] getObjs() { return objs; }
+    public ArrayList<Object>[] getObjs() { return objs; }
     public int getCurSize() { return curSize; }
-    public void pushObj(CompositeHiveObject obj) { objs[curSize++] = obj; }
+    public void pushObj(ArrayList<Object> obj) { objs[curSize++] = obj; }
     public void popObj() { curSize--; }
   }
 
@@ -81,23 +77,24 @@
   transient static protected Byte[] order; // order in which the results should be outputted
   transient protected joinCond[] condn;
   transient protected boolean noOuterJoin;
-  transient private HiveObject[] dummyObj; // for outer joins, contains the potential nulls for the concerned aliases
-  transient private Vector<CompositeHiveObject>[] dummyObjVectors;
-  transient private Stack<Iterator<CompositeHiveObject>> iterators;
+  transient private Object[] dummyObj; // for outer joins, contains the potential nulls for the concerned aliases
+  transient private Vector<ArrayList<Object>>[] dummyObjVectors;
+  transient private Stack<Iterator<ArrayList<Object>>> iterators;
   transient private int totalSz; // total size of the composite object
-
+  transient ObjectInspector joinOutputObjectInspector;
+  
   static
   {
     aliasField = ExprNodeEvaluatorFactory.get(new exprNodeColumnDesc(String.class, Utilities.ReduceField.ALIAS.toString()));
   }
   
-  HashMap<Byte, Vector<CompositeHiveObject>> storage;
+  HashMap<Byte, Vector<ArrayList<Object>>> storage;
 
   public void initialize(Configuration hconf) throws HiveException {
     super.initialize(hconf);
     totalSz = 0;
     // Map that contains the rows for each alias
-    storage = new HashMap<Byte, Vector<CompositeHiveObject>>();
+    storage = new HashMap<Byte, Vector<ArrayList<Object>>>();
     
     numValues = conf.getExprs().size();
     joinExprs = new HashMap<Byte, JoinExprMap>();
@@ -123,51 +120,61 @@
       for (int j = 0; j < sz; j++)
         valueFields[j] = ExprNodeEvaluatorFactory.get(expr.get(j));
 
-      joinExprs.put(key, new JoinExprMap(valueFields, CompositeHiveObject
-          .getFields(sz)));
+      joinExprs.put(key, new JoinExprMap(valueFields));
     }
 
-    dummyObj = new HiveObject[numValues];
+    ArrayList<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(totalSz);
+    for(int i=0; i<totalSz; i++) {
+      structFieldObjectInspectors.add(ObjectInspectorFactory.getStandardPrimitiveObjectInspector(String.class));
+    }
+    joinOutputObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
+        ObjectInspectorUtils.getIntegerArray(totalSz), structFieldObjectInspectors);
+
+    dummyObj = new Object[numValues];
     dummyObjVectors = new Vector[numValues];
 
     int pos = 0;
     for (Byte alias : order) {
       int sz = map.get(alias).size();
-      CompositeHiveObject nr = new CompositeHiveObject(sz);
+      ArrayList<Object> nr = new ArrayList<Object>(sz);
 
       for (int j = 0; j < sz; j++)
-        nr.addHiveObject(null);
+        nr.add(null);
 
       dummyObj[pos] = nr;
-      Vector<CompositeHiveObject> values = new Vector<CompositeHiveObject>();
-      values.add((CompositeHiveObject) dummyObj[pos]);
+      Vector<ArrayList<Object>> values = new Vector<ArrayList<Object>>();
+      values.add((ArrayList<Object>) dummyObj[pos]);
       dummyObjVectors[pos] = values;
       pos++;
     }
 
-    iterators = new Stack<Iterator<CompositeHiveObject>>();
+    iterators = new Stack<Iterator<ArrayList<Object>>>();
   }
 
   public void startGroup() throws HiveException {
-    l4j.trace("Join: Starting new group");
+    LOG.trace("Join: Starting new group");
     storage.clear();
     for (Byte alias : order)
-      storage.put(alias, new Vector<CompositeHiveObject>());
+      storage.put(alias, new Vector<ArrayList<Object>>());
   }
 
-  public void process(HiveObject row) throws HiveException {
+  InspectableObject tempAliasInspectableObject = new InspectableObject();
+  public void process(Object row, ObjectInspector rowInspector) throws HiveException {
     try {
       // get alias
-      Byte alias = (Byte) (aliasField.evaluate(row).getJavaObject());
+      aliasField.evaluate(row, rowInspector, tempAliasInspectableObject);
+      Byte alias = (Byte) (tempAliasInspectableObject.o);
 
       // get the expressions for that alias
       JoinExprMap exmap = joinExprs.get(alias);
       ExprNodeEvaluator[] valueFields = exmap.getValueFields();
 
       // Compute the values
-      CompositeHiveObject nr = new CompositeHiveObject(valueFields.length);
-      for (ExprNodeEvaluator vField : valueFields)
-        nr.addHiveObject(vField.evaluate(row));
+      ArrayList<Object> nr = new ArrayList<Object>(valueFields.length);
+      for (ExprNodeEvaluator vField : valueFields) {
+        vField.evaluate(row, rowInspector, tempAliasInspectableObject);
+        nr.add(tempAliasInspectableObject.o);
+      }
 
       // Add the value to the vector
       storage.get(alias).add(nr);
@@ -178,30 +185,29 @@
   }
 
   private void createForwardJoinObject(IntermediateObject intObj, boolean[] nullsArr) throws HiveException {
-    CompositeHiveObject nr = new CompositeHiveObject(totalSz);
+    ArrayList<Object> nr = new ArrayList<Object>(totalSz);
     for (int i = 0; i < numValues; i++) {
       Byte alias = order[i];
       int sz = joinExprs.get(alias).getValueFields().length;
-      if (nullsArr[i])
-        for (int j = 0; j < sz; j++)
-          nr.addHiveObject(null);
-      else
-      {
-        List <SerDeField> fields = joinExprs.get(alias).getListFields();
-        CompositeHiveObject obj = intObj.getObjs()[i];
-        for (int j = 0; j < sz; j++)
-          nr.addHiveObject(obj.get(fields.get(j)));
+      if (nullsArr[i]) {
+        for (int j = 0; j < sz; j++) {
+          nr.add(null);
+        }
+      } else {
+        ArrayList<Object> obj = intObj.getObjs()[i];
+        for (int j = 0; j < sz; j++) {
+          nr.add(obj.get(j));
+        }
       }
     }
-
-    forward(nr);
+    forward(nr, joinOutputObjectInspector);
   }
 
   private void copyOldArray(boolean[] src, boolean[] dest) {
     for (int i = 0; i < src.length; i++) dest[i] = src[i];
   }
 
-  private Vector<boolean[]> joinObjectsInnerJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, CompositeHiveObject newObj, IntermediateObject intObj, int left, boolean newObjNull)
+  private Vector<boolean[]> joinObjectsInnerJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
   {
     if (newObjNull) return resNulls;
     Iterator<boolean[]> nullsIter = inputNulls.iterator();
@@ -219,7 +225,7 @@
     return resNulls;
   }
   
-  private Vector<boolean[]> joinObjectsLeftOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, CompositeHiveObject newObj, IntermediateObject intObj, int left, boolean newObjNull)
+  private Vector<boolean[]> joinObjectsLeftOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
   {
     Iterator<boolean[]> nullsIter = inputNulls.iterator();
     while (nullsIter.hasNext())
@@ -237,7 +243,7 @@
     return resNulls;
   }
 
-  private Vector<boolean[]> joinObjectsRightOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, CompositeHiveObject newObj, IntermediateObject intObj, int left, boolean newObjNull)
+  private Vector<boolean[]> joinObjectsRightOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
   {
     if (newObjNull) return resNulls;
     boolean allOldObjsNull = true;
@@ -276,7 +282,7 @@
     return resNulls;
   }
 
-  private Vector<boolean[]> joinObjectsFullOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, CompositeHiveObject newObj, IntermediateObject intObj, int left, boolean newObjNull)
+  private Vector<boolean[]> joinObjectsFullOuterJoin(Vector<boolean[]> resNulls, Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int left, boolean newObjNull)
   {
     if (newObjNull) {
       Iterator<boolean[]> nullsIter = inputNulls.iterator();
@@ -344,7 +350,7 @@
    * list of nulls is changed appropriately. The list will contain all non-nulls
    * for a inner join. The outer joins are processed appropriately.
    */
-  private Vector<boolean[]> joinObjects(Vector<boolean[]> inputNulls, CompositeHiveObject newObj, IntermediateObject intObj, int joinPos)
+  private Vector<boolean[]> joinObjects(Vector<boolean[]> inputNulls, ArrayList<Object> newObj, IntermediateObject intObj, int joinPos)
   {
     Vector<boolean[]> resNulls = new Vector<boolean[]>();
     boolean newObjNull = newObj == dummyObj[joinPos] ? true : false;
@@ -395,11 +401,11 @@
   private void genObject(Vector<boolean[]> inputNulls, int aliasNum, IntermediateObject intObj) 
     throws HiveException {
     if (aliasNum < numValues) {
-      Iterator<CompositeHiveObject> aliasRes = storage.get(order[aliasNum])
+      Iterator<ArrayList<Object>> aliasRes = storage.get(order[aliasNum])
         .iterator();
       iterators.push(aliasRes);
       while (aliasRes.hasNext()) {
-        CompositeHiveObject newObj = aliasRes.next();
+        ArrayList<Object> newObj = aliasRes.next();
         intObj.pushObj(newObj);
         Vector<boolean[]> newNulls = joinObjects(inputNulls, newObj, intObj, aliasNum);
         genObject(newNulls, aliasNum + 1, intObj);
@@ -424,20 +430,24 @@
    */
   public void endGroup() throws HiveException {
     try {
-      l4j.trace("Join Op: endGroup called");
+      LOG.trace("Join Op: endGroup called: numValues=" + numValues);
 
       // does any result need to be emitted
       for (int i = 0; i < numValues; i++) {
         Byte alias = order[i];
         if (storage.get(alias).iterator().hasNext() == false) {
-          if (noOuterJoin)
+          if (noOuterJoin) {
+            LOG.trace("No data for alias=" + i);
             return;
-          else
+          } else {
             storage.put(alias, dummyObjVectors[i]);
+          }
         }
       }
 
-      genObject(null, 0, new IntermediateObject(new CompositeHiveObject[numValues], 0));
+      LOG.trace("calling genObject");
+      genObject(null, 0, new IntermediateObject(new ArrayList[numValues], 0));
+      LOG.trace("called genObject");
     } catch (Exception e) {
       e.printStackTrace();
       throw new HiveException(e);
@@ -449,7 +459,7 @@
    * 
    */
   public void close(boolean abort) throws HiveException {
-    l4j.trace("Join Op close");
+    LOG.trace("Join Op close");
     super.close(abort);
   }
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LabeledCompositeHiveObject.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LabeledCompositeHiveObject.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LabeledCompositeHiveObject.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/LabeledCompositeHiveObject.java Fri Sep 19 16:56:30 2008
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import org.apache.hadoop.hive.serde.*;
-/**
- * wrapper over composite hive object that attaches names to each field
- * (instead of the positional names of CompositeHiveObject)
- */
-public class LabeledCompositeHiveObject extends CompositeHiveObject {
-  String [] labels;
-
-  public LabeledCompositeHiveObject(int width) {
-    super(width);
-    throw new RuntimeException ("Labaled Hive Objects require field names");
-  }
-
-  public LabeledCompositeHiveObject(String [] labels) {
-    super(labels.length);
-    this.labels = labels;
-  }
-
-  @Override
-  public SerDeField getFieldFromExpression(String expr) {
-
-    int dot = expr.indexOf(".");
-    String label = expr;
-    if(dot != -1) {
-      assert(dot != (expr.length()-1));
-
-      label = expr.substring(0, dot);
-      expr =  expr.substring(dot+1);
-    } else {
-      expr = null;
-    }
-
-    for(int i=0; i<width; i++) {
-      if(label.equals(labels[i])) {
-        return new CompositeSerDeField(i, expr);
-      }
-    }
-    throw new RuntimeException ("Cannot match expression "+label+"."+expr+" against any label!");
-  }
-}

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Fri Sep 19 16:56:30 2008
@@ -31,10 +31,12 @@
 import org.apache.hadoop.hive.ql.plan.tableDesc;
 import org.apache.hadoop.hive.ql.plan.partitionDesc;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde.ConstantTypedSerDeField;
-import org.apache.hadoop.hive.serde.SerDe;
-import org.apache.hadoop.hive.serde.SerDeException;
-import org.apache.hadoop.hive.serde.SerDeField;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
 
 /**
@@ -48,9 +50,16 @@
   private static final long serialVersionUID = 1L;
   public static enum Counter {DESERIALIZE_ERRORS}
   transient private LongWritable deserialize_error_count = new LongWritable ();
-  transient private SerDe decoder;
-  transient private ArrayList<String> partCols;
-  transient private ArrayList<SerDeField> partFields;
+  transient private Deserializer deserializer;
+  
+  transient private Object row;
+  transient private Object[] rowWithPart;
+  transient private StructObjectInspector rowObjectInspector;
+
+  transient private List<String> partNames;
+  transient private List<String> partValues;
+  transient private List<ObjectInspector> partObjectInspectors;
+  
 
   public void initialize(Configuration hconf) throws HiveException {
     super.initialize(hconf);
@@ -66,12 +75,12 @@
         // pick up work corresponding to this configuration path
         List<String> aliases = conf.getPathToAliases().get(onefile);
         for(String onealias: aliases) {
-          l4j.info("Adding alias " + onealias + " to work list for file " + fpath.toUri().getPath());
+          LOG.info("Adding alias " + onealias + " to work list for file " + fpath.toUri().getPath());
           todo.add(conf.getAliasToWork().get(onealias));
         }
 
         // initialize decoder once based on what table we are processing
-        if(decoder != null) {
+        if(deserializer != null) {
           continue;
         }
 
@@ -83,7 +92,7 @@
         HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME, String.valueOf(p.getProperty("name")));
         HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME, String.valueOf(partSpec));
         try {
-          Class sdclass = td.getSerdeClass();
+          Class sdclass = td.getDeserializerClass();
           if(sdclass == null) {
             String className = td.getSerdeClassName();
             if ((className == "") || (className == null)) {
@@ -91,28 +100,40 @@
             }
             sdclass = MapOperator.class.getClassLoader().loadClass(className);
           }
-          decoder = (SerDe) sdclass.newInstance();
-          decoder.initialize(hconf, p);
-
+          deserializer = (Deserializer) sdclass.newInstance();
+          deserializer.initialize(hconf, p);
+          rowObjectInspector = (StructObjectInspector)deserializer.getObjectInspector();
+          
           // Next check if this table has partitions and if so
           // get the list of partition names as well as allocate
           // the serdes for the partition columns
           String pcols = p.getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_PARTITION_COLUMNS);
           if (pcols != null && pcols.length() > 0) {
-            partCols = new ArrayList<String>();
-            partFields = new ArrayList<SerDeField>();
-            String[] part_keys = pcols.trim().split("/");
-            for(String key: part_keys) {
-              partCols.add(key);
-              partFields.add(new ConstantTypedSerDeField(key, partSpec.get(key)));
+            partNames = new ArrayList<String>();
+            partValues = new ArrayList<String>();
+            partObjectInspectors = new ArrayList<ObjectInspector>();
+            String[] partKeys = pcols.trim().split("/");
+            for(String key: partKeys) {
+              partNames.add(key);
+              partValues.add(partSpec.get(key));
+              partObjectInspectors.add(
+                  ObjectInspectorFactory.getStandardPrimitiveObjectInspector(String.class));
             }
+            StructObjectInspector partObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(partNames, partObjectInspectors);
+            
+            rowWithPart = new Object[2];
+            rowWithPart[1] = partValues;
+            rowObjectInspector = ObjectInspectorFactory.getUnionStructObjectInspector(
+                Arrays.asList(new StructObjectInspector[]{
+                    rowObjectInspector, 
+                    partObjectInspector}));
           }
           else {
-            partCols = null;
-            partFields = null;
+            partNames = null;
+            partValues = null;
           }
 
-          l4j.info("Got partitions: " + pcols);
+          LOG.info("Got partitions: " + pcols);
         } catch (SerDeException e) {
           e.printStackTrace();
           throw new HiveException (e);
@@ -129,7 +150,7 @@
     if(todo.size() == 0) {
       // didn't find match for input file path in configuration!
       // serious problem ..
-      l4j.error("Configuration does not have any alias for path: " + fpath.toUri().getPath());
+      LOG.error("Configuration does not have any alias for path: " + fpath.toUri().getPath());
       throw new HiveException("Configuration and input path are inconsistent");
     }
 
@@ -146,19 +167,24 @@
     }
   }
 
-  public void process(HiveObject r) throws HiveException {
-    throw new RuntimeException("Should not be called!");
-  }
-
   public void process(Writable value) throws HiveException {
     try {
-      Object ev = decoder.deserialize(value);
-      HiveObject ho = new TableHiveObject(ev, decoder, partCols, partFields);
-      forward(ho);
+      if (partNames == null) {
+        row = deserializer.deserialize(value);
+        forward(row, rowObjectInspector);
+      } else {
+        rowWithPart[0] = deserializer.deserialize(value);
+        forward(rowWithPart, rowObjectInspector);
+      }
     } catch (SerDeException e) {
       // TODO: policy on deserialization errors
       deserialize_error_count.set(deserialize_error_count.get()+1);
       throw new HiveException (e);
     }
   }
+
+  public void process(Object row, ObjectInspector rowInspector)
+      throws HiveException {
+    throw new HiveException("Hive 2 Internal error: should not be called!");
+  }
 }

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MapRedTask.java Fri Sep 19 16:56:30 2008
@@ -42,6 +42,7 @@
   public int execute() {
 
     try {
+      // enable assertion
       String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
       String hiveJar = conf.getJar();
       String hiveConfArgs = ExecDriver.generateCmdLine(conf);

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/NullHiveObject.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/NullHiveObject.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/NullHiveObject.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/NullHiveObject.java Fri Sep 19 16:56:30 2008
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.util.*;
-
-import org.apache.hadoop.hive.serde.*;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-/**
- * Represents a NULL object
- */
-public class NullHiveObject extends HiveObject {
-
-  public NullHiveObject() {
-    setIsNull(true);
-  }
-
-  public SerDeField getFieldFromExpression(String expr) throws HiveException {
-    return null;
-  }
-
-  public HiveObject get(SerDeField field) throws HiveException {
-    return this;
-  }
-
-  public List<SerDeField> getFields() throws HiveException {
-    return null;
-  }
-
-  public boolean isPrimitive() { return false;}
-
-  public boolean equals(Object other) {
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    throw new RuntimeException("not supported");
-  }
-}

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Sep 19 16:56:30 2008
@@ -23,6 +23,8 @@
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.mapredWork;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.conf.Configuration;
@@ -86,7 +88,7 @@
 
   transient protected HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable> ();
   transient protected OutputCollector out;
-  transient protected Log l4j;
+  transient protected Log LOG = LogFactory.getLog(this.getClass().getName());;
   transient protected mapredWork gWork;
   transient protected String alias;
   transient protected String joinAlias;
@@ -159,50 +161,47 @@
   }
 
   public void initialize (Configuration hconf) throws HiveException {
-    l4j = LogFactory.getLog(this.getClass().getName());
-    l4j.info("Initializing Self");
+    LOG.info("Initializing Self");
     
     if(childOperators == null) {
       return;
     }
-    l4j.info("Initializing children:");
+    LOG.info("Initializing children:");
     for(Operator<? extends Serializable> op: childOperators) {
       op.initialize(hconf);
     }    
-    l4j.info("Initialization Done");
+    LOG.info("Initialization Done");
   }
 
-  public abstract void process(HiveObject r) throws HiveException;
+  public abstract void process(Object row, ObjectInspector rowInspector) throws HiveException;
  
   // If a operator wants to do some work at the beginning of a group
   public void startGroup() throws HiveException {
-    l4j = LogFactory.getLog(this.getClass().getName());
-    l4j.trace("Starting group");
+    LOG.debug("Starting group");
     
     if (childOperators == null)
       return;
     
-    l4j.trace("Starting group for children:");
+    LOG.debug("Starting group for children:");
     for (Operator<? extends Serializable> op: childOperators)
       op.startGroup();
     
-    l4j.trace("Start group Done");
+    LOG.debug("Start group Done");
   }  
   
   // If a operator wants to do some work at the beginning of a group
   public void endGroup() throws HiveException
   {
-     l4j = LogFactory.getLog(this.getClass().getName());
-    l4j.trace("Ending group");
+    LOG.debug("Ending group");
     
     if (childOperators == null)
       return;
     
-    l4j.trace("Ending group for children:");
+    LOG.debug("Ending group for children:");
     for (Operator<? extends Serializable> op: childOperators)
       op.endGroup();
     
-    l4j.trace("Start group Done");
+    LOG.debug("End group Done");
   }
 
   public void close(boolean abort) throws HiveException {
@@ -218,12 +217,13 @@
     }
   }
 
-  protected void forward(HiveObject r) throws HiveException {
+  protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
+    
     if(childOperators == null) {
       return;
     }
     for(Operator<? extends Serializable> o: childOperators) {
-      o.process(r);
+      o.process(row, rowInspector);
     }
   }
 
@@ -249,7 +249,7 @@
 
   public void logStats () {
     for(Enum<?> e: statsMap.keySet()) {
-      l4j.info(e.toString() + ":" + statsMap.get(e).toString());
+      LOG.info(e.toString() + ":" + statsMap.get(e).toString());
     }    
   }
 

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/PrimitiveHiveObject.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/PrimitiveHiveObject.java?rev=697291&r1=697290&r2=697291&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/PrimitiveHiveObject.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/PrimitiveHiveObject.java Fri Sep 19 16:56:30 2008
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec;
-
-import java.util.*;
-
-import org.apache.hadoop.hive.serde.*;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-/**
- * Encapsulation for a primitive Java Object
- */
-
-public final class PrimitiveHiveObject extends HiveObject {
-
-  public PrimitiveHiveObject(Object javaObject) {
-
-    this.javaObject = javaObject;
-  }
-
-  public SerDeField getFieldFromExpression(String expr) throws HiveException {
-    throw new HiveException ("Illegal call getFieldFromExpression() on Primitive Object");
-  }
-
-  public HiveObject get(SerDeField field) throws HiveException {
-    throw new HiveException ("Illegal call get() on Primitive Object");
-  }
-
-  public List<SerDeField> getFields() throws HiveException {
-    throw new HiveException ("Illegal call getFields() on Primitive Object");
-  }
-
-  public boolean isPrimitive() { return true; }
-
-  @Override
-  public String toString () {
-    return (javaObject == null ? "" : javaObject.toString());
-  }
-
-  @Override
-  public int hashCode() {
-    return (javaObject == null ? 0 : javaObject.hashCode());
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (! (other instanceof PrimitiveHiveObject)) return false;
-    return javaObject == null ? ((PrimitiveHiveObject)other).javaObject == null
-        : javaObject.equals(((PrimitiveHiveObject)other).javaObject);
-  }
-}



Mime
View raw message