pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1709122 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/o...
Date Fri, 16 Oct 2015 22:44:35 GMT
Author: rohini
Date: Fri Oct 16 22:44:34 2015
New Revision: 1709122

URL: http://svn.apache.org/viewvc?rev=1709122&view=rev
Log:
PIG-4697: Pig needs to serialize only part of the udfcontext for each vertex (rohini)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigInputFormatTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezUDFContextSeparator.java
    pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
    pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Oct 16 22:44:34 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4697: Pig needs to serialize only part of the udfcontext for each vertex (rohini)
+
 PIG-4702: Load once for sampling and partitioning in order by for certain LoadFuncs (rohini)
 
 PIG-4699: Print Job stats information in Tez like mapreduce (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Fri Oct 16 22:44:34 2015
@@ -693,13 +693,13 @@ public class JobControlCompiler{
             if(Utils.isLocal(pigContext, conf)) {
                 ConfigurationUtil.replaceConfigForLocalMode(conf);
             }
-            conf.set("pig.inputs", ObjectSerializer.serialize(inp));
-            conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
-            conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
-            conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+            conf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(inp));
+            conf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
+            conf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(inpSignatureLists));
+            conf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits));
 
             // Removing job credential entry before serializing pigcontext into jobconf
-            // since this path would be invalid for the new job being created 
+            // since this path would be invalid for the new job being created
             pigContext.getProperties().remove("mapreduce.job.credentials.binary");
 
             conf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Fri Oct 16 22:44:34 2015
@@ -57,6 +57,9 @@ public class PigInputFormat extends Inpu
             .getLog(PigInputFormat.class);
 
     public static final String PIG_INPUTS = "pig.inputs";
+    public static final String PIG_INPUT_TARGETS = "pig.inpTargets";
+    public static final String PIG_INPUT_SIGNATURES = "pig.inpSignatures";
+    public static final String PIG_INPUT_LIMITS = "pig.inpLimits";
 
     /**
      * @deprecated Use {@link UDFContext} instead in the following way to get
@@ -109,7 +112,7 @@ public class PigInputFormat extends Inpu
 
         List<Long> inpLimitLists =
                 (ArrayList<Long>)ObjectSerializer.deserialize(
-                        conf.get("pig.inpLimits"));
+                        conf.get(PIG_INPUT_LIMITS));
 
         return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, inpLimitLists.get(pigSplit.getInputIndex()));
     }
@@ -171,7 +174,7 @@ public class PigInputFormat extends Inpu
             Configuration conf) throws IOException {
         List<String> inpSignatureLists =
                 (ArrayList<String>)ObjectSerializer.deserialize(
-                        conf.get("pig.inpSignatures"));
+                        conf.get(PIG_INPUT_SIGNATURES));
         // signature can be null for intermediate jobs where it will not
         // be required to be passed down
         if(inpSignatureLists.get(inputIndex) != null) {
@@ -197,9 +200,9 @@ public class PigInputFormat extends Inpu
         PigContext pigContext;
         try {
             inputs = (ArrayList<FileSpec>) ObjectSerializer
-                    .deserialize(conf.get("pig.inputs"));
+                    .deserialize(conf.get(PIG_INPUTS));
             inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
-                    .deserialize(conf.get("pig.inpTargets"));
+                    .deserialize(conf.get(PIG_INPUT_TARGETS));
             pigContext = (PigContext) ObjectSerializer.deserialize(conf
                     .get("pig.pigContext"));
             PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Fri Oct 16 22:44:34 2015
@@ -605,6 +605,10 @@ public class POUserFunc extends Expressi
         return func;
     }
 
+    public String getSignature() {
+        return signature;
+    }
+
     public void setSignature(String signature) {
         this.signature = signature;
         if (this.func!=null) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Oct 16 22:44:34 2015
@@ -98,14 +98,17 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigInputFormatTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -114,6 +117,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.UDFContextSeparator.UDFType;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
 import org.apache.tez.common.TezUtils;
@@ -169,6 +173,11 @@ public class TezDagBuilder extends TezOp
     private FileSystem fs;
     private long intermediateTaskInputSize;
     private Set<String> inputSplitInDiskVertices;
+    private TezUDFContextSeparator udfContextSeparator;
+
+    private String serializedTezPlan;
+    private String serializedPigContext;
+    private String serializedUDFImportList;
 
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
@@ -200,6 +209,25 @@ public class TezDagBuilder extends TezOp
                 globalConf.getLong(
                         InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                         InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+
+        try {
+            serializedPigContext = ObjectSerializer.serialize(pc);
+            serializedUDFImportList = ObjectSerializer.serialize(PigContext.getPackageImportList());
+
+            udfContextSeparator = new TezUDFContextSeparator(plan,
+                    new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+            udfContextSeparator.visit();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String getSerializedTezPlan() throws IOException {
+        if (serializedTezPlan == null) {
+            // Initialize lazy as auto parallelism might not be in play
+            serializedTezPlan = ObjectSerializer.serialize(getPlan());
+        }
+        return serializedTezPlan;
     }
 
     // Hack to turn off relocalization till TEZ-2192 is fixed.
@@ -339,8 +367,9 @@ public class TezDagBuilder extends TezOp
         OutputDescriptor out = OutputDescriptor.create(edge.outputClassName);
 
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
-        UDFContext.getUDFContext().serialize(conf);
+
         if (!combinePlan.isEmpty()) {
+            udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC);
             addCombiner(combinePlan, to, conf, isMergedInput);
         }
 
@@ -377,9 +406,8 @@ public class TezDagBuilder extends TezOp
 
         conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
         conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
-        conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-        conf.set("udf.import.list",
-                ObjectSerializer.serialize(PigContext.getPackageImportList()));
+        conf.set("pig.pigContext", serializedPigContext);
+        conf.set("udf.import.list", serializedUDFImportList);
 
         if(to.isGlobalSort() || to.isLimitAfterSort()){
             conf.set("pig.sortOrder",
@@ -407,9 +435,6 @@ public class TezDagBuilder extends TezOp
                     edge.partitionerClass.getName());
         }
 
-        conf.set("udf.import.list",
-                ObjectSerializer.serialize(PigContext.getPackageImportList()));
-
         MRToTezHelper.processMRSettings(conf, globalConf);
 
         in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
@@ -446,11 +471,6 @@ public class TezDagBuilder extends TezOp
                 MRCombiner.class.getName());
         conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
                 PigCombiner.Combine.class.getName());
-        conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
-        conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
-        conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-        conf.set("udf.import.list",
-                ObjectSerializer.serialize(PigContext.getPackageImportList()));
         conf.set("pig.combinePlan", ObjectSerializer.serialize(combinePlan));
         conf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
         conf.set("pig.map.keytype", ObjectSerializer
@@ -473,6 +493,37 @@ public class TezDagBuilder extends TezOp
         @SuppressWarnings("deprecation")
         Job job = new Job(payloadConf);
         payloadConf = (JobConf) job.getConfiguration();
+        payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
+        payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
+        payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
+                PigInputFormatTez.class, InputFormat.class);
+        setOutputFormat(job);
+        payloadConf.set("udf.import.list", serializedUDFImportList);
+        payloadConf.set("exectype", "TEZ");
+        MRToTezHelper.processMRSettings(payloadConf, globalConf);
+
+        // Process stores
+        LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
+
+        Configuration inputPayLoad = null;
+        Configuration outputPayLoad = null;
+
+        if (!stores.isEmpty()) {
+            outputPayLoad = new Configuration(payloadConf);
+            outputPayLoad.set(JobControlCompiler.PIG_MAP_STORES,
+                    ObjectSerializer.serialize(new ArrayList<POStore>()));
+        }
+
+        if (!(tezOp.getLoaderInfo().getLoads().isEmpty())) {
+            payloadConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
+            payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
+            payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
+            inputPayLoad = new Configuration(payloadConf);
+            if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() instanceof DefaultIndexableLoader) {
+                inputPayLoad.set("pig.pigContext", serializedPigContext);
+            }
+        }
+        payloadConf.set("pig.pigContext", serializedPigContext);
 
         if (tezOp.getSampleOperator() != null) {
             payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString());
@@ -494,21 +545,6 @@ public class TezDagBuilder extends TezOp
 
         }
 
-        payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
-        payloadConf.set("pig.inpSignatures", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
-        payloadConf.set("pig.inpLimits", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
-        // Process stores
-        LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
-
-        payloadConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-        payloadConf.set("udf.import.list",
-                ObjectSerializer.serialize(PigContext.getPackageImportList()));
-        payloadConf.set("exectype", "TEZ");
-        payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
-        payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
-        payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
-                PigInputFormat.class, InputFormat.class);
-
         // Set parent plan for all operators in the Tez plan.
         new PhyPlanSetter(tezOp.plan).visit();
 
@@ -582,7 +618,6 @@ public class TezDagBuilder extends TezOp
             //POShuffleTezLoad accesses the comparator setting
             selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);
         }
-        setOutputFormat(job);
 
         // set parent plan in all operators. currently the parent plan is really
         // used only when POStream, POSplit are present in the plan
@@ -592,9 +627,7 @@ public class TezDagBuilder extends TezOp
         payloadConf.set(PigProcessor.PLAN,
                 ObjectSerializer.serialize(tezOp.plan));
 
-        UDFContext.getUDFContext().serialize(payloadConf);
-
-        MRToTezHelper.processMRSettings(payloadConf, globalConf);
+        udfContextSeparator.serializeUDFContext(payloadConf, tezOp);
 
         if (!pc.inIllustrator) {
             for (POStore store : stores) {
@@ -660,19 +693,19 @@ public class TezDagBuilder extends TezOp
                     }
                 }
                 if (containScatterGather && !containCustomPartitioner) {
+                    vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
                     // Use auto-parallelism feature of ShuffleVertexManager to dynamically
                     // reduce the parallelism of the vertex
                     if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
                             && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
                         vmPluginName = PigGraceShuffleVertexManager.class.getName();
                         tezOp.setUseGraceParallelism(true);
+                        vmPluginConf.set("pig.tez.plan", getSerializedTezPlan());
+                        vmPluginConf.set("pig.pigContext", serializedPigContext);
                     } else {
                         vmPluginName = ShuffleVertexManager.class.getName();
                     }
-                    vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
                     vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
-                    vmPluginConf.set("pig.tez.plan", ObjectSerializer.serialize(getPlan()));
-                    vmPluginConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
                     if (stores.size() <= 0) {
                         // Intermediate reduce. Set the bytes per reducer to be block size.
                         vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
@@ -769,7 +802,7 @@ public class TezDagBuilder extends TezOp
                 MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
                 int splitsSerializedSize = splitsProto.getSerializedSize();
                 if(splitsSerializedSize > spillThreshold) {
-                    payloadConf.setBoolean(
+                    inputPayLoad.setBoolean(
                             org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
                             false);
                     // Write splits to disk
@@ -794,7 +827,9 @@ public class TezDagBuilder extends TezOp
                 //Free up memory
                 tezOp.getLoaderInfo().setInputSplitInfo(null);
             }
-            userPayLoadBuilder.setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf));
+
+            udfContextSeparator.serializeUDFContext(inputPayLoad, tezOp, UDFType.LOADFUNC);
+            userPayLoadBuilder.setConfigurationBytes(TezUtils.createByteStringFromConf(inputPayLoad));
 
             vertex.setLocationHint(VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()));
             vertex.addDataSource(ld.getOperatorKey().toString(),
@@ -811,19 +846,17 @@ public class TezDagBuilder extends TezOp
         Set<String> uniqueStoreOutputs = new HashSet<String>();
         for (POStore store : stores) {
 
-            ArrayList<POStore> emptyList = new ArrayList<POStore>();
             ArrayList<POStore> singleStore = new ArrayList<POStore>();
             singleStore.add(store);
 
-            Configuration outputPayLoad = new Configuration(payloadConf);
-            outputPayLoad.set(JobControlCompiler.PIG_MAP_STORES,
-                    ObjectSerializer.serialize(emptyList));
-            outputPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
+            Configuration outPayLoad = new Configuration(outputPayLoad);
+            udfContextSeparator.serializeUDFContext(outPayLoad, tezOp, store);
+            outPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
                     ObjectSerializer.serialize(singleStore));
 
             OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
                     MROutput.class.getName()).setUserPayload(TezUtils
-                    .createUserPayloadFromConf(outputPayLoad));
+                    .createUserPayloadFromConf(outPayLoad));
             if (tezOp.getVertexGroupStores() != null) {
                 OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey());
                 if (vertexGroupKey != null) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Fri Oct 16 22:44:34 2015
@@ -134,6 +134,7 @@ public class LoaderProcessor extends Tez
                 // Now add the input handling operator for the Tez backend
                 // TODO: Move this upstream to the PhysicalPlan generation
                 POSimpleTezLoad tezLoad = new POSimpleTezLoad(ld.getOperatorKey(), ld.getLFile());
+                tezLoad.setSignature(ld.getSignature());
                 tezLoad.setInputKey(ld.getOperatorKey().toString());
                 tezLoad.copyAliasFrom(ld);
                 tezLoad.setCacheFiles(ld.getCacheFiles());
@@ -146,10 +147,10 @@ public class LoaderProcessor extends Tez
             UDFContext.getUDFContext().serialize(conf);
             conf.set("udf.import.list",
                     ObjectSerializer.serialize(PigContext.getPackageImportList()));
-            conf.set("pig.inputs", ObjectSerializer.serialize(inp));
-            conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
-            conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
-            conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+            conf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(inp));
+            conf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
+            conf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(inpSignatureLists));
+            conf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits));
             String tmp;
             long maxCombinedSplitSize = 0;
             if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigInputFormatTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigInputFormatTez.java?rev=1709122&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigInputFormatTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigInputFormatTez.java Fri Oct 16 22:44:34 2015
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import static org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez.resetUDFContextForThreadReuse;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.data.Tuple;
+
+public class PigInputFormatTez extends PigInputFormat {
+
+    @Override
+    public RecordReader<Text, Tuple> createRecordReader(InputSplit split,
+            TaskAttemptContext context) throws IOException,
+            InterruptedException {
+        resetUDFContextForThreadReuse();
+        return super.createRecordReader(split, context);
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java Fri Oct 16 22:44:34 2015
@@ -20,21 +20,34 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
 
 public class PigOutputFormatTez extends PigOutputFormat {
 
+
+    @Override
+    public RecordWriter<WritableComparable, Tuple> getRecordWriter(
+            TaskAttemptContext taskattemptcontext) throws IOException,
+            InterruptedException {
+        resetUDFContextForThreadReuse();
+        return super.getRecordWriter(taskattemptcontext);
+    }
+
     @Override
     public OutputCommitter getOutputCommitter(
             TaskAttemptContext taskattemptcontext) throws IOException,
             InterruptedException {
+        resetUDFContextForThreadReuse();
         setupUdfEnvAndStores(taskattemptcontext);
 
         // we return an instance of PigOutputCommitterTez (PIG-4202) to Hadoop - this instance
@@ -44,6 +57,21 @@ public class PigOutputFormatTez extends
                 reduceStores);
     }
 
+    public static void resetUDFContextForThreadReuse() {
+        // On the Tez AM, MROutput OutputCommitters are initialized and setupJob
+        // called on them in a loop in the same thread.
+        // commitJob/abortJob can be called from any thread based on events received from vertices
+
+        // On the Tez tasks, it initializes different inputs/outputs in different Initializer threads
+        // by submitting them to a thread pool. Even though threadpoolsize=numInputs+numOutputs
+        // a thread can be reused.
+
+        // Since deserialized UDFContext from input and output payload contains
+        // information only for that input or output reduce payload sizes, we need to
+        // ensure it is deserialized everytime before use in a thread to get the right one.
+        UDFContext.getUDFContext().reset();
+    }
+
     public static class PigOutputCommitterTez extends PigOutputCommitter {
 
         public PigOutputCommitterTez(TaskAttemptContext context,
@@ -54,39 +82,35 @@ public class PigOutputFormatTez extends
 
         @Override
         public void setupJob(JobContext context) throws IOException {
-            cleanupForContainerReuse();
+            resetUDFContextForThreadReuse();
             try {
                 super.setupJob(context);
             } finally {
-                cleanupForContainerReuse();
+                resetUDFContextForThreadReuse();
             }
 
         }
 
         @Override
         public void commitJob(JobContext context) throws IOException {
-            cleanupForContainerReuse();
+            resetUDFContextForThreadReuse();
             try {
                 super.commitJob(context);
             } finally {
-                cleanupForContainerReuse();
+                resetUDFContextForThreadReuse();
             }
         }
 
         @Override
         public void abortJob(JobContext context, State state)
                 throws IOException {
-            cleanupForContainerReuse();
+            resetUDFContextForThreadReuse();
             try {
                 super.abortJob(context, state);
             } finally {
-                cleanupForContainerReuse();
+                resetUDFContextForThreadReuse();
             }
         }
 
-        private void cleanupForContainerReuse() {
-            UDFContext.getUDFContext().reset();
-        }
-
     }
 }

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezUDFContextSeparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezUDFContextSeparator.java?rev=1709122&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezUDFContextSeparator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezUDFContextSeparator.java Fri Oct 16 22:44:34 2015
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContextSeparator;
+import org.apache.pig.impl.util.UDFContextSeparator.UDFType;
+
+public class TezUDFContextSeparator extends TezOpPlanVisitor{
+
+    private UDFContextSeparator udfContextSeparator;
+
+    public TezUDFContextSeparator(TezOperPlan plan,
+            PlanWalker<TezOperator, TezOperPlan> walker) {
+        super(plan, walker);
+        udfContextSeparator = new UDFContextSeparator();
+    }
+
+    @Override
+    public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+        if (!tezOperator.isVertexGroup()) {
+            udfContextSeparator.setPlan(tezOperator.plan, tezOperator.getOperatorKey().toString());
+            udfContextSeparator.visit();
+
+            for (Entry<OperatorKey, TezEdgeDescriptor> entry : tezOperator.outEdges.entrySet()) {
+                PhysicalPlan combinePlan = entry.getValue().combinePlan;
+                if (!combinePlan.isEmpty()) {
+                    udfContextSeparator.setPlan(combinePlan,
+                            tezOperator.getOperatorKey().toString() + "-" + entry.getKey().toString());
+                    udfContextSeparator.visit();
+                }
+            }
+        }
+    }
+
+    public void serializeUDFContext(Configuration conf, TezOperator tezOp) throws IOException {
+        // Serialize all - LoadFunc, StoreFunc, UserFunc
+        udfContextSeparator.serializeUDFContext(conf, tezOp.getOperatorKey().toString(), UDFType.values());
+    }
+
+    public void serializeUDFContext(Configuration conf, TezOperator tezOp,
+            UDFType udfType) throws IOException {
+        udfContextSeparator.serializeUDFContext(conf, tezOp.getOperatorKey().toString(), udfType);
+    }
+
+    public void serializeUDFContextForEdge(Configuration conf,
+            TezOperator from, TezOperator to, UDFType udfType) throws IOException {
+        udfContextSeparator.serializeUDFContext(conf,
+                from.getOperatorKey().toString() + "-" + to.getOperatorKey().toString(), udfType);
+    }
+
+    public void serializeUDFContext(Configuration conf, TezOperator tezOp,
+            POStore store) throws IOException {
+        udfContextSeparator.serializeUDFContext(conf, tezOp.getOperatorKey().toString(), store);
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContext.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContext.java Fri Oct 16 22:44:34 2015
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,8 +35,9 @@ public class UDFContext {
     private Configuration jconf = null;
     private HashMap<UDFContextKey, Properties> udfConfs;
     private Properties clientSysProps;
-    private static final String CLIENT_SYS_PROPS = "pig.client.sys.props";
-    private static final String UDF_CONTEXT = "pig.udf.context";
+
+    static final String CLIENT_SYS_PROPS = "pig.client.sys.props";
+    static final String UDF_CONTEXT = "pig.udf.context";
 
     private static ThreadLocal<UDFContext> tss = new ThreadLocal<UDFContext>() {
         @Override
@@ -81,6 +84,14 @@ public class UDFContext {
     /*
      *  internal pig use only - should NOT be called from user code
      */
+    HashMap<UDFContextKey, Properties> getUdfConfs() {
+        return udfConfs;
+    }
+
+
+    /*
+     *  internal pig use only - should NOT be called from user code
+     */
     public void setClientSystemProps(Properties properties) {
         clientSysProps = properties;
     }
@@ -197,10 +208,19 @@ public class UDFContext {
      * @throws IOException if underlying serialization throws it
      */
     public void serialize(Configuration conf) throws IOException {
+        // Minor optimziation. Remove empty properties before serialization.
+        Iterator<Entry<UDFContextKey, Properties>> iter = udfConfs.entrySet().iterator();
+        while (iter.hasNext()) {
+            Entry<UDFContextKey, Properties> entry = iter.next();
+            if (entry.getValue().isEmpty()) {
+                iter.remove();
+            }
+        }
         conf.set(UDF_CONTEXT, ObjectSerializer.serialize(udfConfs));
         conf.set(CLIENT_SYS_PROPS, ObjectSerializer.serialize(clientSysProps));
     }
 
+
     /**
      * Populate the udfConfs field.  This function is intended to
      * be called by Map.configure or Reduce.configure on the backend.
@@ -255,23 +275,31 @@ public class UDFContext {
      *  it holds the class and args of the udf, and
      *  implements equals() and hashCode()
      */
-    private static class UDFContextKey implements Serializable{
+    static class UDFContextKey implements Serializable{
 
         private static final long serialVersionUID = 1;
         private String className;
         private String[] args;
 
-        UDFContextKey(){
-        }
-
         UDFContextKey(String className, String [] args){
             this.className = className;
             this.args = args;
         }
 
-        /* (non-Javadoc)
-         * @see java.lang.Object#hashCode()
-         */
+        String getClassName() {
+            return className;
+        }
+
+        String[] getArgs() {
+            return args;
+        }
+
+        @Override
+        public String toString() {
+            return "UDFContextKey [className=" + className + ", args="
+                    + Arrays.toString(args) + "]";
+        }
+
         @Override
         public int hashCode() {
             final int prime = 31;
@@ -282,9 +310,6 @@ public class UDFContext {
             return result;
         }
 
-        /* (non-Javadoc)
-         * @see java.lang.Object#equals(java.lang.Object)
-         */
         @Override
         public boolean equals(Object obj) {
             if (this == obj)

Added: pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java?rev=1709122&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java (added)
+++ pig/trunk/src/org/apache/pig/impl/util/UDFContextSeparator.java Fri Oct 16 22:44:34 2015
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.Algebraic;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContext.UDFContextKey;
+
+public class UDFContextSeparator extends PhyPlanVisitor {
+
+    public static enum UDFType {
+        LOADFUNC,
+        STOREFUNC,
+        USERFUNC,
+    };
+
+    private String planOpKey;
+    private DepthFirstWalker<PhysicalOperator, PhysicalPlan> dfw;
+    private Map<String, Map<Enum<UDFType>, List<UDFContext.UDFContextKey>>> udfContextsPerPlan;
+    private UDFContext udfContext;
+    private Set<UDFContext.UDFContextKey> allKeys;
+    private Set<UDFContext.UDFContextKey> knownKeys;
+    private Set<UDFContext.UDFContextKey> unKnownKeys;
+    private Set<UDFContext.UDFContextKey> algebraicUDFKeys;
+
+    public UDFContextSeparator(){
+        super(null, null);
+        dfw = new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(null);
+        udfContext = UDFContext.getUDFContext();
+        allKeys = udfContext.getUdfConfs().keySet();
+        knownKeys = new HashSet<UDFContext.UDFContextKey>();
+        algebraicUDFKeys = udfContext.getUdfConfs().keySet();
+        udfContextsPerPlan = new HashMap<String, Map<Enum<UDFType>, List<UDFContext.UDFContextKey>>>();
+    }
+
+    public Set<UDFContext.UDFContextKey> getUnKnownKeys() {
+        if (unKnownKeys == null) {
+            unKnownKeys = new HashSet<UDFContext.UDFContextKey>(allKeys);
+            unKnownKeys.removeAll(knownKeys);
+            for (Entry<UDFContextKey, Properties> entry : udfContext.getUdfConfs().entrySet()) {
+                if (entry.getValue().isEmpty()) {
+                    // Remove empty values
+                    unKnownKeys.remove(entry.getKey());
+                }
+            }
+        }
+        return unKnownKeys;
+    }
+
+    public void setPlan(PhysicalPlan plan, String planOpKey){
+        mPlan = plan;
+        dfw.setPlan(plan);
+        mCurrentWalker = dfw;
+        this.planOpKey = planOpKey;
+        this.udfContextsPerPlan.put(planOpKey, new HashMap<Enum<UDFType>, List<UDFContextKey>>());
+    }
+
+    @Override
+    public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
+        if (userFunc.getFunc() instanceof Algebraic) {
+            for (UDFContext.UDFContextKey key : allKeys) {
+                if (key.getClassName().equals(userFunc.getFunc().getClass().getName())) {
+                    // If Algebraic handle differently. To be on the safer side
+                    // as user might be just accessing properties by base class name
+                    // instead of by Initial, Intermediate and Final classes
+                    algebraicUDFKeys.add(key);
+                }
+            }
+        } else {
+            findAndAddKeys(userFunc.getFunc().getClass().getName(),
+                    userFunc.getSignature(), UDFType.USERFUNC);
+        }
+    }
+
+    @Override
+    public void visitLoad(POLoad ld) throws VisitorException {
+        findAndAddKeys(ld.getLoadFunc().getClass().getName(),
+                ld.getSignature(), UDFType.LOADFUNC);
+    }
+
+
+    @Override
+    public void visitStore(POStore st) throws VisitorException {
+        findAndAddKeys(st.getStoreFunc().getClass().getName(),
+                st.getSignature(), UDFType.STOREFUNC);
+    }
+
+    @Override
+    public void visitCast(POCast op) {
+        if (op.getFuncSpec() != null) {
+            findAndAddKeys(op.getFuncSpec().getClass().getName(),
+                    null, UDFType.USERFUNC);
+        }
+    }
+
+    private void findAndAddKeys(String keyClassName, String signature, UDFType udfType) {
+        for (UDFContext.UDFContextKey key : allKeys) {
+            if (key.getClassName().equals(keyClassName)
+                    && (key.getArgs() == null
+                    || signature == null
+                    || Arrays.asList(key.getArgs()).contains(signature))) {
+                Map<Enum<UDFType>, List<UDFContextKey>> udfKeysByType = udfContextsPerPlan
+                        .get(planOpKey);
+                List<UDFContextKey> keyList = udfContextsPerPlan.get(planOpKey)
+                        .get(udfType);
+                if (keyList == null) {
+                    keyList = new ArrayList<UDFContext.UDFContextKey>();
+                    udfKeysByType.put(udfType, keyList);
+                }
+                keyList.add(key);
+                knownKeys.add(key);
+            }
+        }
+    }
+
+    public void serializeUDFContext(Configuration conf, String planOpKey,
+            UDFType... udfTypes) throws IOException {
+        Map<UDFContextKey, Properties> udfConfs = udfContext.getUdfConfs();
+        HashMap<UDFContextKey, Properties> udfConfsToSerialize = new HashMap<UDFContextKey, Properties>();
+        Map<Enum<UDFType>, List<UDFContextKey>> udfKeysByType = udfContextsPerPlan.get(planOpKey);
+        if (udfKeysByType != null) {
+            for (UDFType udfType : udfTypes) {
+                List<UDFContextKey> keyList = udfContextsPerPlan.get(planOpKey).get(udfType);
+                if (keyList != null) {
+                    for (UDFContextKey key : keyList) {
+                        udfConfsToSerialize.put(key, udfConfs.get(key));
+                    }
+                }
+                if (udfType.equals(UDFType.USERFUNC)) {
+                    for (UDFContextKey key : algebraicUDFKeys) {
+                        udfConfsToSerialize.put(key, udfConfs.get(key));
+                    }
+                }
+            }
+        }
+        serialize(conf, udfConfsToSerialize);
+    }
+
+    public void serializeUDFContext(Configuration conf, String planOpKey,
+            POStore store) throws IOException {
+        Map<UDFContextKey, Properties> udfConfs = udfContext.getUdfConfs();
+        HashMap<UDFContextKey, Properties> udfConfsToSerialize = new HashMap<UDFContextKey, Properties>();
+        // Find keys specific to just this StoreFunc
+        Map<Enum<UDFType>, List<UDFContextKey>> udfKeysByType = udfContextsPerPlan.get(planOpKey);
+        if (udfKeysByType != null) {
+            List<UDFContextKey> keyList = udfContextsPerPlan.get(planOpKey).get(
+                    UDFType.STOREFUNC);
+            if (keyList != null) {
+                String keyClassName = store.getStoreFunc().getClass().getName();
+                String signature = store.getSignature();
+                for (UDFContextKey key : keyList) {
+                    if (key.getClassName().equals(keyClassName)
+                            && (key.getArgs() == null
+                            || Arrays.asList(key.getArgs()).contains(signature))) {
+                        udfConfsToSerialize.put(key, udfConfs.get(key));
+                    }
+                }
+            }
+        }
+        serialize(conf, udfConfsToSerialize);
+    }
+
+    private void serialize(Configuration conf,
+            HashMap<UDFContextKey, Properties> udfConfsToSerialize)
+            throws IOException {
+        HashMap<UDFContextKey, Properties> udfConfs = udfContext.getUdfConfs();
+        // Add unknown ones for serialization
+        for (UDFContextKey key : getUnKnownKeys()) {
+            udfConfsToSerialize.put(key, udfConfs.get(key));
+        }
+        conf.set(UDFContext.UDF_CONTEXT, ObjectSerializer.serialize(udfConfsToSerialize));
+        conf.set(UDFContext.CLIENT_SYS_PROPS, ObjectSerializer.serialize(udfContext.getClientSystemProps()));
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1709122&r1=1709121&r2=1709122&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Fri Oct 16 22:44:34 2015
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapred.Counters;
 import org.apache.pig.PigCounters;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -134,7 +135,7 @@ public class TezVertexStats extends JobS
             this.stores = (List<POStore>) ObjectSerializer.deserialize(
                     conf.get(JobControlCompiler.PIG_REDUCE_STORES));
             this.loads = (List<FileSpec>) ObjectSerializer.deserialize(
-                    conf.get("pig.inputs"));
+                    conf.get(PigInputFormat.PIG_INPUTS));
         } catch (IOException e) {
             LOG.warn("Failed to deserialize the store list", e);
         }



Mime
View raw message