incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1294442 - in /incubator/hcatalog/trunk: CHANGES.txt src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
Date Tue, 28 Feb 2012 01:50:50 GMT
Author: gates
Date: Tue Feb 28 01:50:50 2012
New Revision: 1294442

URL: http://svn.apache.org/viewvc?rev=1294442&view=rev
Log:
HCATALOG-278 When outputSchema doesn't match table schema wrong columns are returned to the
user

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1294442&r1=1294441&r2=1294442&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Feb 28 01:50:50 2012
@@ -52,6 +52,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-278 When outputSchema doesn't match table schema wrong columns are returned to the
user (gates)
+
   HCAT-276 After merging in HCATALOG-237 related changes Pig scripts with more than one store
fail (daijy via gates)
 
   HCAT-257 e2e harness not working properly after file location change (gates)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1294442&r1=1294441&r2=1294442&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
(original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
Tue Feb 28 01:50:50 2012
@@ -36,10 +36,13 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.SerDe;
 
+import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.LazyHCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
 
 /** The HCat wrapper for the underlying RecordReader, 
  * this ensures that the initialize on
@@ -64,6 +67,9 @@ class HCatRecordReader extends RecordRea
 
     private Map<Integer,Object> partCols;
 
+    private HCatSchema outputSchema = null;
+    private HCatSchema tableSchema = null;
+
     /**
      * Instantiates a new hcat record reader.
      * @param baseRecordReader the base record reader
@@ -89,6 +95,10 @@ class HCatRecordReader extends RecordRea
                            TaskAttemptContext taskContext)
     throws IOException, InterruptedException {
         org.apache.hadoop.mapred.InputSplit baseSplit;
+        
+        // Pull the output schema out of the TaskAttemptContext
+        outputSchema = (HCatSchema)HCatUtil.deserialize(
+          taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA));
 
         if( split instanceof HCatSplit ) {
             baseSplit = ((HCatSplit) split).getBaseSplit();
@@ -96,6 +106,10 @@ class HCatRecordReader extends RecordRea
           throw new IOException("Not a HCatSplit");
         }
 
+        // Pull the table schema out of the Split info
+        // TODO This should be passed in teh TaskAttemptContext instead
+        tableSchema = ((HCatSplit)split).getTableSchema();
+
         Properties properties = new Properties();
         for (Map.Entry<String, String>param : 
             ((HCatSplit)split).getPartitionInfo()
@@ -122,14 +136,32 @@ class HCatRecordReader extends RecordRea
       HCatRecord r;
 
       try {
-        r = new DefaultHCatRecord((new LazyHCatRecord(
+        /*
+        return new DefaultHCatRecord((new LazyHCatRecord(
                                             serde.deserialize(currentValue), 
                                serde.getObjectInspector(), 
                                partCols)).getAll());
+                               */
+        r = new LazyHCatRecord(serde.deserialize(currentValue), 
+          serde.getObjectInspector(), partCols);
+        if (outputSchema == null) {
+          // there's no projection being done
+          return new DefaultHCatRecord(r.getAll());
+        } else {
+          // For each field in the outputSchema, do the mapping
+          DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size());
+          for (int i = 0; i < outputSchema.size(); i++) {
+            // Figure out the field to read
+            HCatFieldSchema ofs = outputSchema.get(i);
+            dr.set(i, r.get(ofs.getName(), tableSchema));
+          }
+          return dr;
+        }
+            
+          
       } catch (Exception e) { 
         throw new IOException("Failed to create HCatRecord " + e);
       }
-      return r; 
     }
 
     /* (non-Javadoc)



Mime
View raw message