parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject git commit: Parquet-70: Fixed storing pig schema to udfcontext for non projection case and moved...
Date Wed, 20 Aug 2014 20:52:54 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master 7af955a0a -> 7b415faae


Parquet-70: Fixed storing pig schema to udfcontext for non projection case and moved...

... column index access setting to udfcontext so as not to affect other loaders.

I found an problem that affects both the Column name access and column index access due to
the way the pig schema is stored by the loader.

##Column Name Access:
The ParquetLoader was only storing the pig schema in the UDFContext when push projection is
applied.  In the full load case, the schema was not stored which triggered a full reload of
the schema during task execution.  You can see in initSchema references the UDFContext for
the schema, but that is only set in push projection.  However, the schema needs to be set
in both the job context (so the TupleReadSupport can access the schema) and the UDFContext
(so the task side loader can access it), which is why it is set in both locations.  This also
meant the requested schema was never set to the task side either, which could cause other
problems as well.

##Column Index Access:
For index based access, the problem was that the column index access setting and the requested
schema were not stored in the udfcontext and sent to the task side (unless pushProjection
was called).  The schema was stored in the job context, but this would be overwritten if another
loader was executed first.  Also, the property to use column index access was only being set
at the job context level, so subsequent loaders would use column index access even if they
didn't request it.

This fix now ensures that both the schema and column index access are set in the udfcontext
and loaded in the initSchema method.

JIRA: https://issues.apache.org/jira/browse/PARQUET-70

-Dan

Author: Daniel Weeks <dweeks@netflix.com>

Closes #36 from dcw-netflix/pig-schema-context and squashes the following commits:

f896a25 [Daniel Weeks] Moved property loading into setInput
8f3dc28 [Daniel Weeks] Changed to set job conf settings in both front and backend
d758de0 [Daniel Weeks] Updated to use isFrontend() for setting context properties
b7ef96a [Daniel Weeks] Fixed storing pig schema to udfcontext for non projection case and
moved column index access setting to udfcontext so as not to affect other loaders.


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/7b415faa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/7b415faa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/7b415faa

Branch: refs/heads/master
Commit: 7b415faaed09eba1103ea30577ef1a32fba7048c
Parents: 7af955a
Author: Daniel Weeks <dweeks@netflix.com>
Authored: Wed Aug 20 13:52:42 2014 -0700
Committer: julien <julien@twitter.com>
Committed: Wed Aug 20 13:52:42 2014 -0700

----------------------------------------------------------------------
 .../main/java/parquet/pig/ParquetLoader.java    | 35 +++++++++++++-------
 1 file changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/7b415faa/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
index 91d68bd..91dc0da 100644
--- a/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
+++ b/parquet-pig/src/main/java/parquet/pig/ParquetLoader.java
@@ -135,23 +135,37 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata,
LoadPushDow
   @Override
   public void setLocation(String location, Job job) throws IOException {
     if (DEBUG) LOG.debug("LoadFunc.setLocation(" + location + ", " + job + ")");
-    setInput(location, job);
-    getConfiguration(job).set(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
     
-    if(requiredFieldList != null) {
-      getConfiguration(job).set(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
-    }
-    
-    if(this.columnIndexAccess) {
-        getConfiguration(job).set(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
-    }
+    setInput(location, job);
   }
 
   private void setInput(String location, Job job) throws IOException {
     this.setLocationHasBeenCalled  = true;
     this.location = location;
     setInputPaths(job, location);
+    
+    //This is prior to load because the initial value comes from the constructor
+    //not file metadata or pig framework and would get overwritten in initSchema().
+    if(UDFContext.getUDFContext().isFrontend()) {
+      storeInUDFContext(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
+    }
+    
+    schema = PigSchemaConverter.parsePigSchema(getPropertyFromUDFContext(PARQUET_PIG_SCHEMA));
+    requiredFieldList = PigSchemaConverter.deserializeRequiredFieldList(getPropertyFromUDFContext(PARQUET_PIG_REQUIRED_FIELDS));
+    columnIndexAccess = Boolean.parseBoolean(getPropertyFromUDFContext(PARQUET_COLUMN_INDEX_ACCESS));
+    
     initSchema(job);
+    
+    if(UDFContext.getUDFContext().isFrontend()) {
+      //Setting for task-side loading via initSchema()
+      storeInUDFContext(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
+      storeInUDFContext(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
+    }
+    
+    //Used by task-side loader via TupleReadSupport
+    getConfiguration(job).set(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
+    getConfiguration(job).set(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
+    getConfiguration(job).set(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
   }
 
   @Override
@@ -239,9 +253,6 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDow
     if (schema != null) {
       return;
     }
-    schema = PigSchemaConverter.parsePigSchema(getPropertyFromUDFContext(PARQUET_PIG_SCHEMA));
-    requiredFieldList = PigSchemaConverter.deserializeRequiredFieldList(getPropertyFromUDFContext(PARQUET_PIG_REQUIRED_FIELDS));
-    columnIndexAccess = columnIndexAccess || Boolean.parseBoolean(getPropertyFromUDFContext(PARQUET_COLUMN_INDEX_ACCESS));
     if (schema == null && requestedSchema != null) {
       // this is only true in front-end
       schema = requestedSchema;


Mime
View raw message