pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jcove...@apache.org
Subject svn commit: r1356921 [1/4] - in /pig/trunk: ./ conf/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expr...
Date Tue, 03 Jul 2012 20:36:16 GMT
Author: jcoveney
Date: Tue Jul  3 20:36:09 2012
New Revision: 1356921

URL: http://svn.apache.org/viewvc?rev=1356921&view=rev
Log:
[PIG-2632] Create a SchemaTuple which generates efficient Tuples via code gen (jcoveney)

Added:
    pig/trunk/src/org/apache/pig/data/AppendableSchemaTuple.java
    pig/trunk/src/org/apache/pig/data/FieldIsNullException.java
    pig/trunk/src/org/apache/pig/data/SchemaTuple.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleBackend.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleClassGenerator.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleFactory.java
    pig/trunk/src/org/apache/pig/data/SchemaTupleFrontend.java
    pig/trunk/src/org/apache/pig/data/TupleMaker.java
    pig/trunk/src/org/apache/pig/data/utils/
    pig/trunk/src/org/apache/pig/data/utils/BytesHelper.java
    pig/trunk/src/org/apache/pig/data/utils/MethodHelper.java
    pig/trunk/src/org/apache/pig/data/utils/SedesHelper.java
    pig/trunk/src/org/apache/pig/data/utils/StructuresHelper.java
    pig/trunk/test/org/apache/pig/data/
    pig/trunk/test/org/apache/pig/data/TestSchemaTuple.java
    pig/trunk/test/org/apache/pig/data/utils/
    pig/trunk/test/org/apache/pig/data/utils/TestMethodHelper.java
Removed:
    pig/trunk/src/org/apache/pig/data/PBooleanTuple.java
    pig/trunk/src/org/apache/pig/data/PDoubleTuple.java
    pig/trunk/src/org/apache/pig/data/PFloatTuple.java
    pig/trunk/src/org/apache/pig/data/PIntTuple.java
    pig/trunk/src/org/apache/pig/data/PLongTuple.java
    pig/trunk/src/org/apache/pig/data/PStringTuple.java
    pig/trunk/src/org/apache/pig/data/PrimitiveFieldTuple.java
    pig/trunk/src/org/apache/pig/data/PrimitiveTuple.java
    pig/trunk/test/org/apache/pig/test/TestPrimitiveFieldTuple.java
    pig/trunk/test/org/apache/pig/test/TestPrimitiveTuple.java
Modified:
    pig/trunk/.gitignore
    pig/trunk/CHANGES.txt
    pig/trunk/conf/pig.properties
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
    pig/trunk/src/org/apache/pig/data/BinInterSedes.java
    pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java
    pig/trunk/src/org/apache/pig/data/DataByteArray.java
    pig/trunk/src/org/apache/pig/data/TupleFactory.java
    pig/trunk/src/org/apache/pig/data/TypeAwareTuple.java
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/src/org/apache/pig/impl/io/InterRecordReader.java
    pig/trunk/src/org/apache/pig/impl/io/NullableTuple.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/GroupByConstParallelSetter.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
    pig/trunk/test/org/apache/pig/test/TestDataBag.java
    pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    pig/trunk/test/org/apache/pig/test/TestSchema.java

Modified: pig/trunk/.gitignore
URL: http://svn.apache.org/viewvc/pig/trunk/.gitignore?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/.gitignore (original)
+++ pig/trunk/.gitignore Tue Jul  3 20:36:09 2012
@@ -14,3 +14,4 @@ pig-withouthadoop.jar
 *.orig
 *.rej
 *.class
+*.classpath

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jul  3 20:36:09 2012
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2632: Create a SchemaTuple which generates efficient Tuples via code gen (jcoveney)
+
 PIG-2750: add artifacts to the ivy.xml for other jars Pig generates (julien)
 
 PIG-2748: Change the names of the jar produced in the build folder to match maven conventions (julien)

Modified: pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/trunk/conf/pig.properties?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/conf/pig.properties (original)
+++ pig/trunk/conf/pig.properties Tue Jul  3 20:36:09 2012
@@ -67,6 +67,44 @@
 #pig.sql.type=hcat
 hcat.bin=/usr/local/hcat/bin/hcat
 
+############################ SchemaTuple ############################
+
+# Setting this value will turn on the SchemaTuple feature (PIG-2632)
+# This will attempt to use code generation for more efficient within
+# the pig code. This can lead to both CPU, serialization, and memory
+# benefits (currently, the potential memory benefits are the largest).
+
+# This parameter will enable the optimization in all available cases
+#pig.schematuple=true
+
+# Certain cases can be turned off by uncommenting the following. These will
+# all be off by default, but will all be turned on if pig.schematuple is set
+# to true.
+
+# This will disable SchemaTuples in the case of udfs. Currently,
+# the input to UDF's will be SchemaTuples.
+
+#pig.schematuple.udf=false
+
+# This is currently not implemented. In the future, LoadFunc's with known
+# schema's should output SchemaTuples
+
+#pig.schematuple.load=false
+
+# This will use SchemaTuples in replicated joins. The potential memory saving
+# here is significant. It will use SchemaTuples when it builds the HashMap of
+# the join key to related values.
+
+#pig.schematuple.fr_join=false
+
+# In the current implementation of merge join, all of the Tuples in the left relation
+# that share a given key will be stored in a List in memory. This will use SchemaTuples
+# instead in that List.
+
+#pig.schematuple.merge_join=false
+
+#####################################################################
+
 ##### Set up optional Pig Progress Notification Listener ############
 
 # Note that only one PPNL can be set up. If you need several, write a PPNL that will chain them.

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Jul  3 20:36:09 2012
@@ -72,6 +72,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleFrontend;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
@@ -97,7 +98,6 @@ import org.apache.pig.impl.util.UDFConte
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.ScriptState;
 
-
 /**
  * This is compiler class that takes an MROperPlan and converts
  * it into a JobControl object with the relevant dependency info
@@ -581,6 +581,8 @@ public class JobControlCompiler{
             // distrubted cache.
             setupDistributedCacheForUdfs(mro, pigContext, conf);
 
+            SchemaTupleFrontend.copyAllGeneratedToDistributedCache(pigContext, conf);
+
             POPackage pack = null;
             if(mro.reducePlan.isEmpty()){
                 //MapOnly Job

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Tue Jul  3 20:36:09 2012
@@ -21,17 +21,14 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -43,8 +40,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.PigNullableWritable;
@@ -52,8 +50,8 @@ import org.apache.pig.impl.plan.Dependen
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
@@ -170,6 +168,10 @@ public abstract class PigGenericMapBase 
         
         PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
         pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+
+        // This attempts to fetch all of the generated code from the distributed cache, and resolve it
+        SchemaTupleBackend.initialize(job, pigContext.getExecType());
+
         if (pigContext.getLog4jProperties()!=null)
             PropertyConfigurator.configure(pigContext.getLog4jProperties());
         

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Tue Jul  3 20:36:09 2012
@@ -18,25 +18,18 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Collections;
-import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
@@ -50,9 +43,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.pen.FakeRawKeyValueIterator;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.NullablePartitionWritable;
@@ -61,9 +54,9 @@ import org.apache.pig.impl.io.PigNullabl
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.impl.util.Pair;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
@@ -320,6 +313,9 @@ public class PigGenericMapReduce {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
                 pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
                 
+                // This attempts to fetch all of the generated code from the distributed cache, and resolve it
+                SchemaTupleBackend.initialize(jConf, pigContext.getExecType());
+
                 if (rp == null)
                     rp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
                             .get("pig.reducePlan"));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java Tue Jul  3 20:36:09 2012
@@ -23,12 +23,11 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
-
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BinInterSedes;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -41,10 +40,8 @@ public class PigTupleDefaultRawComparato
     private final Log mLog = LogFactory.getLog(getClass());
     private boolean[] mAsc;
     private boolean mWholeTuple;
-    private TupleFactory mFact;
     private boolean mHasNullField;
 
-    @SuppressWarnings("unchecked")
     public PigTupleDefaultRawComparator() {
         super(TupleFactory.getInstance().tupleClass());
     }
@@ -68,7 +65,6 @@ public class PigTupleDefaultRawComparato
         // If there's only one entry in mAsc, it means it's for the whole
         // tuple. So we can't be looking for each column.
         mWholeTuple = (mAsc.length == 1);
-        mFact = TupleFactory.getInstance();
     }
 
     public Configuration getConf() {
@@ -80,6 +76,8 @@ public class PigTupleDefaultRawComparato
         return mHasNullField;
     }
     
+    private static final BinInterSedes bis = new BinInterSedes();
+
     /**
      * Compare two NullableTuples as raw bytes. If neither are null, then
      * IntWritable.compare() is used. If both are null then the indices are
@@ -89,21 +87,22 @@ public class PigTupleDefaultRawComparato
         int rc = 0;
         mHasNullField = false;
 
+        Tuple t1;
+        Tuple t2;
+
         // This can't be done on the raw data. Users are allowed to
         // implement their own versions of tuples, which means we have no
         // idea what the underlying representation is. So step one is to
         // instantiate each object as a tuple.
-        Tuple t1 = mFact.newTuple();
-        Tuple t2 = mFact.newTuple();
         try {
-            t1.readFields(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
-            t2.readFields(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
+            t1 = bis.readTuple(new DataInputStream(new ByteArrayInputStream(b1, s1, l1)));
+            t2 = bis.readTuple(new DataInputStream(new ByteArrayInputStream(b2, s2, l2)));
         } catch (IOException ioe) {
             mLog.error("Unable to instantiate tuples for comparison: " + ioe.getMessage());
             throw new RuntimeException(ioe.getMessage(), ioe);
         }
 
-        rc = compareTuple(t1, t2);
+        rc = compareTuple(t1, t2); //TODO think about how SchemaTuple could speed this up
 
         return rc;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Tue Jul  3 20:36:09 2012
@@ -25,22 +25,21 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.parser.SourceLocation;
-import org.apache.pig.pen.util.LineageTracer;
-import org.apache.pig.pen.Illustrator;
 import org.apache.pig.pen.Illustrable;
+import org.apache.pig.pen.Illustrator;
+import org.apache.pig.pen.util.LineageTracer;
 
 /**
  *

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Tue Jul  3 20:36:09 2012
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
@@ -41,8 +43,11 @@ import org.apache.pig.builtin.MonitoredU
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.TupleMaker;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -51,6 +56,7 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.UDFContext;
 
 public class POUserFunc extends ExpressionOperator {
+    private static final Log LOG = LogFactory.getLog(POUserFunc.class);
 
     /**
      *
@@ -129,6 +135,9 @@ public class POUserFunc extends Expressi
         this.func.setPigLogger(pigLogger);
     }
 
+    private transient TupleMaker inputTupleMaker;
+    private boolean usingSchemaTupleFactory;
+
     @Override
     public Result processInput() throws ExecException {
 
@@ -139,6 +148,31 @@ public class POUserFunc extends Expressi
         if(!initialized) {
             func.setReporter(reporter);
             func.setPigLogger(pigLogger);
+
+            // We initialize here instead of instantiateFunc because this is called
+            // when actual processing has begun, whereas a function can be instantiated
+            // on the frontend potentially (mainly for optimization)
+            Schema tmpS = func.getInputSchema();
+            if (tmpS != null) {
+                //Currently, getInstanceForSchema returns null if no class was found. This works fine...
+                //if it is null, the default will be used. We pass the context because if it happens that
+                //the same Schema was generated elsewhere, we do not want to override user expectations
+                inputTupleMaker = SchemaTupleFactory.getInstance(tmpS, false, GenContext.UDF);
+                if (inputTupleMaker == null) {
+                    LOG.debug("No SchemaTupleFactory found for Schema ["+tmpS+"], using default TupleFactory");
+                    usingSchemaTupleFactory = false;
+                } else {
+                    LOG.debug("Using SchemaTupleFactory for Schema: " + tmpS);
+                    usingSchemaTupleFactory = true;
+                }
+
+                //In the future, we could optionally use SchemaTuples for output as well
+            }
+
+            if (inputTupleMaker == null) {
+                inputTupleMaker = TupleFactory.getInstance();
+            }
+
             initialized = true;
         }
 
@@ -162,9 +196,14 @@ public class POUserFunc extends Expressi
             detachInput();
             return res;
         } else {
-            res.result = TupleFactory.getInstance().newTuple();
+            //we decouple this because there may be cases where the size is known and it isn't a schema
+            // tuple factory
+            boolean knownSize = usingSchemaTupleFactory;
+            int knownIndex = 0;
+            res.result = inputTupleMaker.newTuple();
 
             Result temp = null;
+
             for(PhysicalOperator op : inputs) {
                 temp = op.getNext(getDummy(op.getResultType()), op.getResultType());
                 if(temp.returnStatus!=POStatus.STATUS_OK) {
@@ -178,14 +217,23 @@ public class POUserFunc extends Expressi
                         Tuple trslt = (Tuple) temp.result;
                         Tuple rslt = (Tuple) res.result;
                         for(int i=0;i<trslt.size();i++) {
+                            if (knownSize) {
+                                rslt.set(knownIndex++, trslt.get(i));
+                            } else {
                             rslt.append(trslt.get(i));
                         }
+                        }
                         continue;
                     }
                 }
+                if (knownSize) {
+                    ((Tuple)res.result).set(knownIndex++, temp.result);
+                } else {
                 ((Tuple)res.result).append(temp.result);
             }
+            }
             res.returnStatus = temp.returnStatus;
+
             return res;
         }
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Tue Jul  3 20:36:09 2012
@@ -18,7 +18,6 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,33 +28,34 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin.TuplesToSchemaTupleList;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ObjectSerializer;
+
+import com.google.common.collect.Lists;
 
 /**
  * The operator models the join keys using the Local Rearrange operators which
@@ -94,7 +94,7 @@ public class POFRJoin extends PhysicalOp
     // The array of Hashtables one per replicated input. replicates[fragment] =
     // null
     // fragment is the input which is fragmented and not replicated.
-    private Map<Tuple, List<Tuple>> replicates[];
+    private TupleToMapKey replicates[];
     // varaible which denotes whether we are returning tuples from the foreach
     // operator
     private boolean processingPlan;
@@ -109,11 +109,22 @@ public class POFRJoin extends PhysicalOp
 
     // This list contains nullTuples according to schema of various inputs 
     private DataBag nullBag;
+    private List<Schema> inputSchemas;
+    private List<Schema> keySchemas;
+
+    public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
+            List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
+            FileSpec[] replFiles, int fragment, boolean isLeftOuter,
+            Tuple nullTuple) throws ExecException {
+        this(k, rp, inp, ppLists, keyTypes, replFiles, fragment, isLeftOuter, nullTuple, null, null);
+    }
 
     public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp,
             List<List<PhysicalPlan>> ppLists, List<List<Byte>> keyTypes,
             FileSpec[] replFiles, int fragment, boolean isLeftOuter,
-            Tuple nullTuple)
+            Tuple nullTuple,
+            List<Schema> inputSchemas,
+            List<Schema> keySchemas)
             throws ExecException {
         super(k, rp, inp);
 
@@ -121,7 +132,7 @@ public class POFRJoin extends PhysicalOp
         this.fragment = fragment;
         this.keyTypes = keyTypes;
         this.replFiles = replFiles;
-        replicates = new Map[ppLists.size()];
+        replicates = new TupleToMapKey[ppLists.size()];
         LRs = new POLocalRearrange[ppLists.size()];
         constExps = new ConstantExpression[ppLists.size()];
         createJoinPlans(k);
@@ -131,6 +142,22 @@ public class POFRJoin extends PhysicalOp
         tupList.add(nullTuple);
         nullBag = new NonSpillableDataBag(tupList);
         this.isLeftOuterJoin = isLeftOuter;
+        if (inputSchemas != null) {
+            this.inputSchemas = inputSchemas;
+        } else {
+            this.inputSchemas = Lists.newArrayListWithCapacity(replFiles.length);
+            for (int i = 0; i < replFiles.length; i++) {
+                this.inputSchemas.add(null);
+            }
+        }
+        if (inputSchemas != null) {
+            this.keySchemas = keySchemas;
+        } else {
+            this.keySchemas = Lists.newArrayListWithCapacity(replFiles.length);
+            for (int i = 0; i < replFiles.length; i++) {
+                this.keySchemas.add(null);
+            }
+        }
     }
 
     public List<List<PhysicalPlan>> getJoinPlans() {
@@ -269,15 +296,15 @@ public class POFRJoin extends PhysicalOp
                     ce.setValue(value);
                     continue;
                 }
-                Map<Tuple, List<Tuple>> replicate = replicates[i];
-                if (!replicate.containsKey(key)) {
+                TupleToMapKey replicate = replicates[i];
+                if (replicate.get(key) == null) {
                     if (isLeftOuterJoin) {
                         ce.setValue(nullBag);
                     }
                     noMatch = true;
                     break;
                 }
-                ce.setValue(new NonSpillableDataBag(replicate.get(key)));
+                ce.setValue(new NonSpillableDataBag(replicate.get(key).getList()));
             }
 
             // If this is not LeftOuter Join and there was no match we
@@ -299,6 +326,30 @@ public class POFRJoin extends PhysicalOp
         }
     }
 
+    private static class TupleToMapKey {
+        private HashMap<Tuple, TuplesToSchemaTupleList> tuples;
+        private SchemaTupleFactory tf;
+
+        public TupleToMapKey(int ct, SchemaTupleFactory tf) {
+            tuples = new HashMap<Tuple, TuplesToSchemaTupleList>(ct);
+            this.tf = tf;
+        }
+
+        public TuplesToSchemaTupleList put(Tuple key, TuplesToSchemaTupleList val) {
+            if (tf != null) {
+                key = TuplesToSchemaTupleList.convert(key, tf);
+            }
+            return tuples.put(key, val);
+        }
+
+        public TuplesToSchemaTupleList get(Tuple key) {
+            if (tf != null) {
+                key = TuplesToSchemaTupleList.convert(key, tf);
+            }
+            return tuples.get(key);
+        }
+    }
+
     /**
      * Builds the HashMaps by reading each replicated input from the DFS using a
      * Load operator
@@ -306,11 +357,29 @@ public class POFRJoin extends PhysicalOp
      * @throws ExecException
      */
     private void setUpHashMap() throws ExecException {
+        List<SchemaTupleFactory> inputSchemaTupleFactories = Lists.newArrayListWithCapacity(inputSchemas.size());
+        List<SchemaTupleFactory> keySchemaTupleFactories = Lists.newArrayListWithCapacity(inputSchemas.size());
+        for (int i = 0; i < inputSchemas.size(); i++) {
+            Schema schema = inputSchemas.get(i);
+            if (schema != null) {
+                log.debug("Using SchemaTuple for FR Join Schema: " + schema);
+                inputSchemaTupleFactories.add(SchemaTupleBackend.newSchemaTupleFactory(schema, false, GenContext.FR_JOIN));
+            }
+            schema = keySchemas.get(i);
+            if (schema != null) {
+                log.debug("Using SchemaTuple for FR Join key Schema: " + schema);
+                keySchemaTupleFactories.add(SchemaTupleBackend.newSchemaTupleFactory(schema, false, GenContext.FR_JOIN));
+            }
+        }
+
         int i = -1;
         long time1 = System.currentTimeMillis();
         for (FileSpec replFile : replFiles) {
             ++i;
 
+            SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories.get(i);
+            SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories.get(i);
+
             if (i == fragment) {
                 replicates[i] = null;
                 continue;
@@ -330,12 +399,11 @@ public class POFRJoin extends PhysicalOp
             // same thing, so utilizing its functionality
             POLocalRearrange lr = LRs[i];
             lr.setInputs(Arrays.asList((PhysicalOperator) ld));
-            Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple, List<Tuple>>(
-                    1000);
+
+            TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory);
+
             log.debug("Completed setup. Trying to build replication hash table");
-            int cnt = 0;
             for (Result res = lr.getNext(dummyTuple);res.returnStatus != POStatus.STATUS_EOP;res = lr.getNext(dummyTuple)) {
-                ++cnt;
                 if (reporter != null)
                     reporter.progress();               
                 Tuple tuple = (Tuple) res.result;
@@ -343,12 +411,14 @@ public class POFRJoin extends PhysicalOp
                 Tuple key = mTupleFactory.newTuple(1);
                 key.set(0, tuple.get(1));
                 Tuple value = getValueTuple(lr, tuple);
-                if (!replicate.containsKey(key))
-                    replicate.put(key, new ArrayList<Tuple>(1));
+
+                if (replicate.get(key) == null) {
+                    replicate.put(key, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory));
+                }
+
                 replicate.get(key).add(value);
             }
             replicates[i] = replicate;
-
         }
         long time2 = System.currentTimeMillis();
         log.debug("Hash Table built. Time taken: " + (time2 - time1));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Tue Jul  3 20:36:09 2012
@@ -20,8 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -39,22 +37,24 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.SchemaTuple;
+import org.apache.pig.data.SchemaTupleBackend;
+import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
+import org.apache.pig.data.SchemaTupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.data.TupleMaker;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.logical.relational.LOJoin;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
 
 /** This operator implements merge join algorithm to do map side joins. 
  *  Currently, only two-way joins are supported. One input of join is identified as left
@@ -87,8 +87,6 @@ public class POMergeJoin extends Physica
 
     private Result prevRightInp;
 
-    private transient TupleFactory mTupleFactory;
-
     //boolean denoting whether we are generating joined tuples in this getNext() call or do we need to read in more data.
     private boolean doingJoin;
 
@@ -99,7 +97,7 @@ public class POMergeJoin extends Physica
     private String indexFile;
 
     // Buffer to hold accumulated left tuples.
-    private List<Tuple> leftTuples;
+    private transient TuplesToSchemaTupleList leftTuples;
 
     private MultiMap<PhysicalOperator, PhysicalPlan> inpPlans;
 
@@ -125,6 +123,20 @@ public class POMergeJoin extends Physica
 
     private String signature;
 
+    // This serves as the default TupleFactory
+    private transient TupleFactory mTupleFactory;
+
+    /**
+     * These TupleFactories are used for more efficient Tuple generation. This should
+     * decrease the amount of memory needed for a given map task to successfully perform
+     * a merge join.
+     */
+    private transient TupleMaker mergedTupleMaker;
+    private transient TupleMaker leftTupleMaker;
+
+    private Schema leftInputSchema;
+    private Schema mergedInputSchema;
+
     /**
      * @param k
      * @param rp
@@ -133,18 +145,18 @@ public class POMergeJoin extends Physica
      * Ex. join A by ($0,$1), B by ($1,$2);
      */
     public POMergeJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, MultiMap<PhysicalOperator, PhysicalPlan> inpPlans,
-            List<List<Byte>> keyTypes, LOJoin.JOINTYPE joinType) throws PlanException{
+            List<List<Byte>> keyTypes, LOJoin.JOINTYPE joinType, Schema leftInputSchema, Schema rightInputSchema, Schema mergedInputSchema) throws PlanException{
 
         super(k, rp, inp);
         this.opKey = k;
         this.doingJoin = false;
         this.inpPlans = inpPlans;
         LRs = new POLocalRearrange[2];
-        mTupleFactory = TupleFactory.getInstance();
-        leftTuples = new ArrayList<Tuple>(arrayListSize);
         this.createJoinPlans(inpPlans,keyTypes);
         this.indexFile = null;
         this.joinType = joinType;  
+        this.leftInputSchema = leftInputSchema;
+        this.mergedInputSchema = mergedInputSchema;
     }
 
     /**
@@ -169,6 +181,90 @@ public class POMergeJoin extends Physica
         }
     }
 
+    /**
+     * This is a helper method that sets up all of the TupleFactory members.
+     */
+    private void prepareTupleFactories() {
+        mTupleFactory = TupleFactory.getInstance();
+
+        if (leftInputSchema != null) {
+            leftTupleMaker = SchemaTupleBackend.newSchemaTupleFactory(leftInputSchema, false, GenContext.MERGE_JOIN);
+        }
+        if (leftTupleMaker == null) {
+            log.debug("No SchemaTupleFactory available for combined left merge join schema: " + leftInputSchema);
+            leftTupleMaker = mTupleFactory;
+        } else {
+            log.debug("Using SchemaTupleFactory for left merge join schema: " + leftInputSchema);
+        }
+
+        if (mergedInputSchema != null) {
+            mergedTupleMaker = SchemaTupleBackend.newSchemaTupleFactory(mergedInputSchema, false, GenContext.MERGE_JOIN);
+        }
+        if (mergedTupleMaker == null) {
+            log.debug("No SchemaTupleFactory available for combined left/right merge join schema: " + mergedInputSchema);
+            mergedTupleMaker = mTupleFactory;
+        } else {
+            log.debug("Using SchemaTupleFactory for left/right merge join schema: " + mergedInputSchema);
+        }
+
+    }
+
+    /**
+     * This provides a List to store Tuples in. The implementation of that list depends on whether
+     * or not there is a TupleFactory available.
+     * @return the list object to store Tuples in
+     */
+    private TuplesToSchemaTupleList newLeftTupleArray() {
+        return new TuplesToSchemaTupleList(arrayListSize, (SchemaTupleFactory)leftTupleMaker);
+    }
+
+    /**
+     * This is a class that extends ArrayList, making it easy to provide on the fly conversion
+     * from Tuple to SchemaTuple. This is necessary because we are not getting SchemaTuples
+     * from the source, though in the future that is what we would like to do.
+     */
+    protected static class TuplesToSchemaTupleList {
+        private List<Tuple> tuples;
+        private SchemaTupleFactory tf;
+
+        protected TuplesToSchemaTupleList(int ct, SchemaTupleFactory tf) {
+            tuples = new ArrayList<Tuple>(ct);
+            this.tf = tf;
+        }
+
+        public static SchemaTuple<?> convert(Tuple t, SchemaTupleFactory tf) {
+            if (t instanceof SchemaTuple<?>) {
+                return (SchemaTuple<?>)t;
+            }
+            SchemaTuple<?> st = tf.newTuple();
+            try {
+                return st.set(t);
+            } catch (ExecException e) {
+                throw new RuntimeException("Unable to set SchemaTuple with schema ["
+                        + st.getSchemaString() + "] with given Tuple in merge join.");
+            }
+        }
+
+        public boolean add(Tuple t) {
+            if (tf != null) {
+                t = convert(t, tf);
+            }
+            return tuples.add(t);
+        }
+
+        public Tuple get(int i) {
+            return tuples.get(i);
+        }
+
+        public int size() {
+            return tuples.size();
+        }
+
+        public List<Tuple> getList() {
+            return tuples;
+        }
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public Result getNext(Tuple t) throws ExecException {
@@ -177,6 +273,9 @@ public class POMergeJoin extends Physica
         Result curLeftInp;
 
         if(firstTime){
+            prepareTupleFactories();
+            leftTuples = newLeftTupleArray();
+
             // Do initial setup.
             curLeftInp = processInput();
             if(curLeftInp.returnStatus != POStatus.STATUS_OK)
@@ -191,7 +290,7 @@ public class POMergeJoin extends Physica
             } catch (IOException e) {
                 throwProcessingException(true, e);
             } catch (ClassCastException e) {
-                throwProcessingException(true, e);;
+                throwProcessingException(true, e);
             }
             leftTuples.add((Tuple)curLeftInp.result);
             firstTime = false;
@@ -205,13 +304,15 @@ public class POMergeJoin extends Physica
             if(counter > 0){    // We have left tuples to join with current right tuple.
                 Tuple joiningLeftTup = leftTuples.get(--counter);
                 leftTupSize = joiningLeftTup.size();
-                Tuple joinedTup = mTupleFactory.newTuple(leftTupSize+rightTupSize);
+                Tuple joinedTup = mergedTupleMaker.newTuple(leftTupSize + rightTupSize);
 
-                for(int i=0; i<leftTupSize; i++)
+                for(int i=0; i<leftTupSize; i++) {
                     joinedTup.set(i, joiningLeftTup.get(i));
+                }
 
-                for(int i=0; i < rightTupSize; i++)
+                for(int i=0; i < rightTupSize; i++) {
                     joinedTup.set(i+leftTupSize, curJoiningRightTup.get(i));
+                }
 
                 return new Result(POStatus.STATUS_OK, joinedTup);
             }
@@ -246,7 +347,8 @@ public class POMergeJoin extends Physica
                             prevRightKey = rightKey;
                             prevRightInp = rightInp;
                             // There cant be any more join on this key.
-                            leftTuples = new ArrayList<Tuple>(arrayListSize);
+                            leftTuples = newLeftTupleArray();
+
                             leftTuples.add((Tuple)prevLeftInp.result);
                         }
 
@@ -315,7 +417,7 @@ public class POMergeJoin extends Physica
 
             // This will happen when we accumulated inputs on left side and moved on, but are still behind the right side
             // In that case, throw away the tuples accumulated till now and add the one we read in this function call.
-            leftTuples = new ArrayList<Tuple>(arrayListSize);
+            leftTuples = newLeftTupleArray();
             leftTuples.add((Tuple)curLeftInp.result);
             prevLeftInp = curLeftInp;
             prevLeftKey = curLeftKey;
@@ -387,7 +489,7 @@ public class POMergeJoin extends Physica
                 prevRightKey = rightKey;
                 prevRightInp = rightInp;
                 // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. 
-                leftTuples = new ArrayList<Tuple>(arrayListSize);
+                leftTuples = newLeftTupleArray();
                 leftTuples.add((Tuple)curLeftInp.result);
                 prevLeftInp = curLeftInp;
                 prevLeftKey = curLeftKey;

Modified: pig/trunk/src/org/apache/pig/builtin/mock/Storage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/mock/Storage.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/mock/Storage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/mock/Storage.java Tue Jul  3 20:36:09 2012
@@ -1,10 +1,5 @@
 package org.apache.pig.builtin.mock;
 
-import static junit.framework.Assert.assertEquals;
-import static org.apache.pig.builtin.mock.Storage.resetData;
-import static org.apache.pig.builtin.mock.Storage.schema;
-import static org.apache.pig.builtin.mock.Storage.tuple;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -30,7 +25,6 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
-import org.apache.pig.ExecType;
 import org.apache.pig.Expression;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;

Added: pig/trunk/src/org/apache/pig/data/AppendableSchemaTuple.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/AppendableSchemaTuple.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/AppendableSchemaTuple.java (added)
+++ pig/trunk/src/org/apache/pig/data/AppendableSchemaTuple.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.utils.SedesHelper;
+
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class AppendableSchemaTuple<T extends AppendableSchemaTuple<T>> extends SchemaTuple<T> {
+    private static final long serialVersionUID = 1L;
+
+    private Tuple appendedFields;
+
+    private static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+    @Override
+    public void append(Object val) {
+        if (appendedFields == null) {
+            appendedFields = mTupleFactory.newTuple();
+        }
+
+        appendedFields.append(val);
+    }
+
+    protected int appendedFieldsSize() {
+        return appendedFields == null ? 0 : appendedFields.size();
+    }
+
+    protected boolean isAppendedFieldsNull() {
+        return appendedFieldsSize() == 0;
+    }
+
+    protected Object getAppendedField(int i) throws ExecException {
+        return isAppendedFieldNull(i) ? null : appendedFields.get(i);
+    }
+
+    private boolean isAppendedFieldNull(int i) throws ExecException {
+        return isAppendedFieldsNull() || appendedFields.isNull(i);
+    }
+
+    public Tuple getAppendedFields() {
+        return appendedFields;
+    }
+
+    protected void setAppendedFields(Tuple t) {
+        appendedFields = t;
+    }
+
+    private void resetAppendedFields() {
+        appendedFields = null;
+    }
+
+    private void setAppendedField(int fieldNum, Object val) throws ExecException {
+        appendedFields.set(fieldNum, val);
+    }
+
+    /**
+     * This adds the additional overhead of the append Tuple
+     */
+    @Override
+    public long getMemorySize() {
+        return SizeUtil.roundToEight(appendedFields.getMemorySize()) + super.getMemorySize();
+    }
+
+
+    private byte getAppendedFieldType(int i) throws ExecException {
+        return appendedFields == null ? DataType.UNKNOWN : appendedFields.getType(i);
+    }
+
+    protected SchemaTuple<T> set(SchemaTuple<?> t, boolean checkType) throws ExecException {
+        resetAppendedFields();
+        for (int j = schemaSize(); j < t.size(); j++) {
+            append(t.get(j));
+        }
+        return super.set(t, checkType);
+    }
+
+    protected SchemaTuple<T> setSpecific(T t) {
+        resetAppendedFields();
+        setAppendedFields(t.getAppendedFields());
+        return super.setSpecific(t);
+    }
+
+    public SchemaTuple<T> set(List<Object> l) throws ExecException {
+        int listSize = l.size();
+        int schemaSize = schemaSize();
+
+        if (listSize < schemaSize) {
+            throw new ExecException("Given list of objects has too few fields ("+l.size()+" vs "+schemaSize()+")");
+        }
+
+        Iterator<Object> it = l.iterator();
+
+        generatedCodeSetIterator(it);
+
+        resetAppendedFields();
+
+        while (it.hasNext()) {
+            append(it.next());
+        }
+
+        return this;
+    }
+
+    protected int compareTo(SchemaTuple<?> t, boolean checkType) {
+        if (checkType && getClass() == t.getClass()) {
+            return compareToSpecific((T)t);
+        }
+        int i = super.compareTo(t, false);
+        if (i != 0) {
+            return i;
+        }
+        if (appendedFieldsSize() > 0) {
+            int m = schemaSize();
+            for (int k = 0; k < size() - schemaSize(); k++) {
+                try {
+                    i = DataType.compare(getAppendedField(k), t.get(m++));
+                } catch (ExecException e) {
+                    throw new RuntimeException("Unable to get append value", e);
+                }
+                if (i != 0) {
+                    return i;
+                }
+            }
+        }
+        return 0;
+    }
+
+    protected int compareToSpecific(T t) {
+        int i = compareSize(t);
+        if (i != 0) {
+            return i;
+        }
+        i = super.compareToSpecific(t);
+        if (i != 0) {
+            return i;
+        }
+        for (int z = 0; z < appendedFieldsSize(); z++) {
+            try {
+                i = DataType.compare(getAppendedField(z), t.getAppendedField(z));
+            } catch (ExecException e) {
+                throw new RuntimeException("Unable to get append", e);
+            }
+            if (i != 0) {
+                return i;
+            }
+        }
+        return 0;
+    }
+
+    public int hashCode() {
+        return super.hashCode() + appendedFields.hashCode();
+    }
+
+    public void set(int fieldNum, Object val) throws ExecException {
+        int diff = fieldNum - schemaSize();
+        if (diff >= 0 && diff < appendedFieldsSize()) {
+            setAppendedField(diff, val);
+        } else {
+            super.set(fieldNum, val);
+        }
+    }
+
+    @Override
+    public Object get(int fieldNum) throws ExecException {
+        int diff = fieldNum - schemaSize();
+        if (diff >= 0 && diff < appendedFieldsSize()) {
+            return getAppendedField(diff);
+        } else {
+            return super.get(fieldNum);
+        }
+    }
+
+    @Override
+    public boolean isNull(int fieldNum) throws ExecException {
+        int diff = fieldNum - schemaSize();
+        if (diff >= 0 && diff < appendedFieldsSize()) {
+            return isAppendedFieldNull(diff);
+        } else {
+            return super.isNull(fieldNum);
+        }
+    }
+
+    @Override
+    public byte getType(int fieldNum) throws ExecException {
+        int diff = fieldNum - schemaSize();
+        if (diff >= 0 && diff < appendedFieldsSize()) {
+            return getAppendedFieldType(diff);
+        } else {
+            return super.getType(fieldNum);
+        }
+    }
+
+    @Override
+    protected void setTypeAwareBase(int fieldNum, Object val, String type) throws ExecException {
+        int diff = fieldNum - schemaSize();
+        if (diff >= 0 && diff < appendedFieldsSize()) {
+            setAppendedField(diff, val);
+        } else {
+            super.setTypeAwareBase(fieldNum, val, type);
+        }
+    }
+
+    @Override
+    protected Object getTypeAwareBase(int fieldNum, String type) throws ExecException {
+        int diff = fieldNum - schemaSize();
+        if (diff >= 0 && diff < appendedFieldsSize()) {
+            return getAppendedField(diff);
+        } else {
+            return super.getTypeAwareBase(fieldNum, type);
+        }
+    }
+
+    protected void writeElements(DataOutput out) throws IOException {
+        boolean[] b = generatedCodeNullsArray();
+        SedesHelper.writeBooleanArray(out, b, isAppendedFieldsNull());
+        generatedCodeWriteElements(out);
+        if (!isAppendedFieldsNull()) {
+            SedesHelper.writeGenericTuple(out, getAppendedFields());
+        }
+    }
+
+    public int size() {
+        return super.size() + appendedFieldsSize();
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        int len = schemaSize() + 1;
+        boolean[] b = SedesHelper.readBooleanArray(in, len);
+        generatedCodeReadFields(in, b);
+        if (!b[len - 1]) {
+            setAppendedFields(SedesHelper.readGenericTuple(in, in.readByte()));
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/data/BinInterSedes.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BinInterSedes.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BinInterSedes.java (original)
+++ pig/trunk/src/org/apache/pig/data/BinInterSedes.java Tue Jul  3 20:36:09 2012
@@ -40,6 +40,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.data.utils.SedesHelper;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 /**
@@ -92,46 +93,53 @@ public class BinInterSedes implements In
 
     public static final byte NULL = 27;
 
-    // These are special bytes to mark optimized "primitive" tuples.
-    public static final byte PINT_TUPLE = 28;
-    public static final byte PFLOAT_TUPLE = 29;
-    public static final byte PLONG_TUPLE = 30;
-    public static final byte PDOUBLE_TUPLE = 31;
-    public static final byte PSTRING_TUPLE = 32;
-    public static final byte PBOOL_TUPLE = 33;
-    public static final byte PRIMITIVE_TUPLE = 34;
-
-    public static final byte LONG_INBYTE = 35;
-    public static final byte LONG_INSHORT = 36;
-    public static final byte LONG_ININT = 37;
-    public static final byte LONG_0 = 38;
-    public static final byte LONG_1 = 39;
+    public static final byte SCHEMA_TUPLE_BYTE_INDEX = 28;
+    public static final byte SCHEMA_TUPLE_SHORT_INDEX = 29;
+    public static final byte SCHEMA_TUPLE = 30;
+
+    public static final byte LONG_INBYTE = 31;
+    public static final byte LONG_INSHORT = 32;
+    public static final byte LONG_ININT = 33;
+    public static final byte LONG_0 = 34;
+    public static final byte LONG_1 = 35;
 
     private static TupleFactory mTupleFactory = TupleFactory.getInstance();
     private static BagFactory mBagFactory = BagFactory.getInstance();
-    static final int UNSIGNED_SHORT_MAX = 65535;
-    static final int UNSIGNED_BYTE_MAX = 255;
+    public static final int UNSIGNED_SHORT_MAX = 65535;
+    public static final int UNSIGNED_BYTE_MAX = 255;
     public static final String UTF8 = "UTF-8";
 
-    private Tuple readTuple(DataInput in, byte type) throws IOException {
-        // Read the size.
-        int sz = getTupleSize(in, type);
-
-        Tuple t = mTupleFactory.newTuple(sz);
-        for (int i = 0; i < sz; i++) {
-            t.set(i, readDatum(in));
+    public Tuple readTuple(DataInput in, byte type) throws IOException {
+        switch (type) {
+        case TUPLE:
+        case TINYTUPLE:
+        case SMALLTUPLE:
+            return SedesHelper.readGenericTuple(in, type);
+        case SCHEMA_TUPLE_BYTE_INDEX:
+        case SCHEMA_TUPLE_SHORT_INDEX:
+        case SCHEMA_TUPLE:
+            return readSchemaTuple(in, type);
+        default:
+            throw new ExecException("Unknown Tuple type found in stream: " + type);
+        }
         }
-        return t;
 
+    private Tuple readSchemaTuple(DataInput in, byte type) throws IOException {
+        int id;
+        switch (type) {
+        case (SCHEMA_TUPLE_BYTE_INDEX): id = in.readUnsignedByte(); break;
+        case (SCHEMA_TUPLE_SHORT_INDEX): id = in.readUnsignedShort(); break;
+        case (SCHEMA_TUPLE): id = in.readInt(); break;
+        default: throw new RuntimeException("Invalid type given to readSchemaTuple: " + type);
     }
 
-    private Tuple readPrimitiveTuple(DataInput in) throws IOException {
-        PrimitiveTuple t = new PrimitiveTuple();
-        t.readFields(in);
-        return t;
+        Tuple st = SchemaTupleFactory.getInstance(id).newTuple();
+        st.readFields(in);
+
+        return st;
     }
 
-    private int getTupleSize(DataInput in, byte type) throws IOException {
+    public int getTupleSize(DataInput in, byte type) throws IOException {
         int sz;
         switch (type) {
         case TINYTUPLE:
@@ -223,17 +231,6 @@ public class BinInterSedes implements In
         return m;
     }
 
-    private static String readCharArray(DataInput in) throws IOException {
-        return in.readUTF();
-    }
-
-    private static String readBigCharArray(DataInput in) throws IOException {
-        int size = in.readInt();
-        byte[] ba = new byte[size];
-        in.readFully(ba);
-        return new String(ba, UTF8);
-    }
-
     private WritableComparable readWritable(DataInput in) throws IOException {
         String className = (String) readDatum(in);
         // create the writeable class . It needs to have a default constructor
@@ -287,7 +284,7 @@ public class BinInterSedes implements In
         case TUPLE:
         case TINYTUPLE:
         case SMALLTUPLE:
-            return readTuple(in, type);
+            return SedesHelper.readGenericTuple(in, type);
 
         case BAG:
         case TINYBAG:
@@ -341,60 +338,26 @@ public class BinInterSedes implements In
         case BYTE:
             return Byte.valueOf(in.readByte());
 
-        case TINYBYTEARRAY: {
-            int size = in.readUnsignedByte();
-            return readBytes(in, size);
-        }
-
-        case SMALLBYTEARRAY: {
-            int size = in.readUnsignedShort();
-            return readBytes(in, size);
-        }
-
-        case BYTEARRAY: {
-            int size = in.readInt();
-            return readBytes(in, size);
-        }
+        case TINYBYTEARRAY:
+        case SMALLBYTEARRAY:
+        case BYTEARRAY:
+            return new DataByteArray(SedesHelper.readBytes(in, type));
 
         case CHARARRAY:
-            return readBigCharArray(in);
-
         case SMALLCHARARRAY:
-            return readCharArray(in);
+            return SedesHelper.readChararray(in, type);
 
         case GENERIC_WRITABLECOMPARABLE:
             return readWritable(in);
 
-        case PRIMITIVE_TUPLE:
-            return readPrimitiveTuple(in);
+        case SCHEMA_TUPLE_BYTE_INDEX:
+        case SCHEMA_TUPLE_SHORT_INDEX:
+        case SCHEMA_TUPLE:
+            return readSchemaTuple(in, type);
 
         case NULL:
             return null;
 
-        case PINT_TUPLE:
-            Tuple t = new PIntTuple();
-            t.readFields(in);
-            return t;
-        case PFLOAT_TUPLE:
-            t = new PFloatTuple();
-            t.readFields(in);
-            return t;
-        case PLONG_TUPLE:
-            t = new PLongTuple();
-            t.readFields(in);
-            return t;
-        case PDOUBLE_TUPLE:
-            t = new PDoubleTuple();
-            t.readFields(in);
-            return t;
-        case PSTRING_TUPLE:
-            t = new PStringTuple();
-            t.readFields(in);
-            return t;
-        case PBOOL_TUPLE:
-            t = new PBooleanTuple();
-            t.readFields(in);
-            return t;
         default:
             throw new RuntimeException("Unexpected data type " + type + " found in stream.");
         }
@@ -505,38 +468,14 @@ public class BinInterSedes implements In
 
         case DataType.BYTEARRAY: {
             DataByteArray bytes = (DataByteArray) val;
-            final int sz = bytes.size();
-            if (sz < UNSIGNED_BYTE_MAX) {
-                out.writeByte(TINYBYTEARRAY);
-                out.writeByte(sz);
-            } else if (sz < UNSIGNED_SHORT_MAX) {
-                out.writeByte(SMALLBYTEARRAY);
-                out.writeShort(sz);
-            } else {
-                out.writeByte(BYTEARRAY);
-                out.writeInt(sz);
-            }
-            out.write(bytes.mData);
-
+            SedesHelper.writeBytes(out, bytes.mData);
             break;
 
         }
 
         case DataType.CHARARRAY: {
             String s = (String) val;
-            // a char can take up to 3 bytes in the modified utf8 encoding
-            // used by DataOutput.writeUTF, so use UNSIGNED_SHORT_MAX/3
-            if (s.length() < UNSIGNED_SHORT_MAX / 3) {
-                out.writeByte(SMALLCHARARRAY);
-                out.writeUTF(s);
-            } else {
-                byte[] utfBytes = s.getBytes(UTF8);
-                int length = utfBytes.length;
-
-                out.writeByte(CHARARRAY);
-                out.writeInt(length);
-                out.write(utfBytes);
-            }
+            SedesHelper.writeChararray(out, s);
             break;
         }
         case DataType.GENERIC_WRITABLECOMPARABLE:
@@ -606,21 +545,7 @@ public class BinInterSedes implements In
         if (t instanceof TypeAwareTuple) {
             t.write(out);
         } else {
-        final int sz = t.size();
-        if (sz < UNSIGNED_BYTE_MAX) {
-            out.writeByte(TINYTUPLE);
-            out.writeByte(sz);
-        } else if (sz < UNSIGNED_SHORT_MAX) {
-            out.writeByte(SMALLTUPLE);
-            out.writeShort(sz);
-        } else {
-            out.writeByte(TUPLE);
-            out.writeInt(sz);
-        }
-
-        for (int i = 0; i < sz; i++) {
-            writeDatum(out, t.get(i));
-        }
+            SedesHelper.writeGenericTuple(out, t);
     }
     }
 
@@ -632,17 +557,11 @@ public class BinInterSedes implements In
     @Override
     public void addColsToTuple(DataInput in, Tuple t) throws IOException {
         byte type = in.readByte();
-        switch (type) {
-        case PRIMITIVE_TUPLE:
-            t.readFields(in);
-            break;
-        default:
         int sz = getTupleSize(in, type);
         for (int i = 0; i < sz; i++) {
             t.append(readDatum(in));
         }
     }
-    }
     
     public static class BinInterSedesTupleRawComparator extends WritableComparator implements TupleRawComparator {
 
@@ -1243,4 +1162,17 @@ public class BinInterSedes implements In
     public Class<? extends TupleRawComparator> getTupleRawComparatorClass() {
         return BinInterSedesTupleRawComparator.class;
     }
+
+    public Tuple readTuple(DataInput in) throws IOException {
+        return readTuple(in, in.readByte());
+    }
+
+    public static boolean isTupleByte(byte b) {
+        return b == BinInterSedes.TUPLE
+            || b == BinInterSedes.SMALLTUPLE
+            || b == BinInterSedes.TINYTUPLE
+            || b == BinInterSedes.SCHEMA_TUPLE
+            || b == BinInterSedes.SCHEMA_TUPLE_BYTE_INDEX
+            || b == BinInterSedes.SCHEMA_TUPLE_SHORT_INDEX;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/BinSedesTupleFactory.java Tue Jul  3 20:36:09 2012
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.data;
 
-import java.lang.Class;
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
@@ -75,5 +74,9 @@ public class BinSedesTupleFactory extend
         return BinSedesTuple.getComparatorClass();
     }
 
+    @Override
+    public boolean isFixedSize() {
+        return false;
+    }
 }
 

Modified: pig/trunk/src/org/apache/pig/data/DataByteArray.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DataByteArray.java?rev=1356921&r1=1356920&r2=1356921&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DataByteArray.java (original)
+++ pig/trunk/src/org/apache/pig/data/DataByteArray.java Tue Jul  3 20:36:09 2012
@@ -19,6 +19,7 @@ package org.apache.pig.data;
 
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
 
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -229,13 +230,11 @@ public class DataByteArray implements Co
 
     @Override
     public int hashCode() {
-        int hash = 1;
-        for (int i = 0; i < mData.length; i++) {
-            // 29 chosen because hash uses 31 and bag 37, and a I want a
-            // prime.
-            hash = 29 * hash + mData[i];
+        return hashCode(mData);
         }
-        return hash;
+
+    public static int hashCode(byte[] buf) {
+        return Arrays.hashCode(buf);
     }
 
 }

Added: pig/trunk/src/org/apache/pig/data/FieldIsNullException.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/FieldIsNullException.java?rev=1356921&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/data/FieldIsNullException.java (added)
+++ pig/trunk/src/org/apache/pig/data/FieldIsNullException.java Tue Jul  3 20:36:09 2012
@@ -0,0 +1,12 @@
+package org.apache.pig.data;
+
+import org.apache.pig.backend.executionengine.ExecException;
+
+public class FieldIsNullException extends ExecException {
+    public FieldIsNullException() {
+    }
+
+    public FieldIsNullException(String msg) {
+        super(msg);
+    }
+}



Mime
View raw message