pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1604517 - in /pig/trunk: CHANGES.txt src/org/apache/pig/newplan/logical/relational/LOUnion.java test/org/apache/pig/test/TestUnionOnSchema.java
Date Sun, 22 Jun 2014 03:11:17 GMT
Author: daijy
Date: Sun Jun 22 03:11:17 2014
New Revision: 1604517

URL: http://svn.apache.org/r1604517
Log:
PIG-4018: Schema validation fails with UNION ONSCHEMA

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
    pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1604517&r1=1604516&r2=1604517&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Jun 22 03:11:17 2014
@@ -40,6 +40,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4018: Schema validation fails with UNION ONSCHEMA (daijy)
+
 PIG-4022: Fix tez e2e test SkewedJoin_6 (daijy)
 
 PIG-4001: POPartialAgg aggregates too aggressively when multiple values aggregated (tmwoodruff
via cheolsoo)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1604517&r1=1604516&r2=1604517&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Sun Jun 22 03:11:17
2014
@@ -76,21 +76,29 @@ public class LOUnion extends LogicalRela
         }
         
         LogicalSchema mergedSchema = null;
-        LogicalSchema s0 = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
         if ( inputs.size() == 1 )
-            return schema = s0;
+            return schema = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
+        
+        List<String> inputAliases = new ArrayList<String>(inputs.size());
+        List<LogicalSchema> inputSchemas = new ArrayList<LogicalSchema>(inputs.size());
+        for (Operator input : inputs) {
+            LogicalRelationalOperator lop = (LogicalRelationalOperator)input;
+            inputAliases.add(lop.getAlias());
+            inputSchemas.add(lop.getSchema());
+        }
         
         if( isOnSchema() ) {
-            mergedSchema = createMergedSchemaOnAlias( inputs );
+            mergedSchema = createMergedSchemaOnAlias( inputSchemas, inputAliases );
         } else {
-            LogicalSchema s1 = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
+            LogicalSchema s0 = inputSchemas.get(0);
+            LogicalSchema s1 = inputSchemas.get(1);
             mergedSchema = LogicalSchema.merge(s0, s1, LogicalSchema.MergeMode.Union);
             if (mergedSchema==null)
                 return null;
             
             // Merge schema
-            for (int i=2;i<inputs.size();i++) {
-                LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
+            for (int i=2;i<inputSchemas.size();i++) {
+                LogicalSchema otherSchema = inputSchemas.get(i);
                 if (mergedSchema==null || otherSchema==null)
                     return null;
                 mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema, LogicalSchema.MergeMode.Union);
@@ -100,28 +108,36 @@ public class LOUnion extends LogicalRela
         }
 
         // Bring back cached uid if any; otherwise, cache uid generated
-        for (int i=0;i<s0.size();i++)
+        for (int i=0;i<mergedSchema.size();i++)
         {
-            LogicalSchema.LogicalFieldSchema outputFieldSchema;
-            if (onSchema) {
-                outputFieldSchema = mergedSchema.getFieldSubNameMatch(s0.getField(i).alias);
-            } else {
-                outputFieldSchema = mergedSchema.getField(i);
-            }
+            LogicalSchema.LogicalFieldSchema outputFieldSchema = mergedSchema.getField(i);
+
             long uid = -1;
-            for (Pair<Long, Long> pair : uidMapping) {
-                if (pair.second==s0.getField(i).uid) {
-                    uid = pair.first;
-                    break;
+            
+            // Search all the cached uid mappings by input field to see if 
+            // we've cached an output uid for this output field
+            for (LogicalSchema inputSchema : inputSchemas) {
+                LogicalSchema.LogicalFieldSchema inputFieldSchema;
+                if (onSchema) {
+                    inputFieldSchema = inputSchema.getFieldSubNameMatch(outputFieldSchema.alias);
+                } else {
+                    inputFieldSchema = inputSchema.getField(i);
+                }
+                
+                if (inputFieldSchema != null) {
+                    uid = getCachedOuputUid(inputFieldSchema.uid);
+                    if (uid >= 0) break;
                 }
             }
+            
+            // No cached uid. Allocate one, and locate and cache all inputs.
             if (uid==-1) {
                 uid = LogicalExpression.getNextUid();
-                for (Operator input : inputs) {
+                for (LogicalSchema inputSchema : inputSchemas) {
                     long inputUid;
                     LogicalFieldSchema matchedInputFieldSchema;
                 	if (onSchema) {
-                	    matchedInputFieldSchema = ((LogicalRelationalOperator)input).getSchema().getFieldSubNameMatch(s0.getField(i).alias);
+                	    matchedInputFieldSchema = inputSchema.getFieldSubNameMatch(mergedSchema.getField(i).alias);
                         if (matchedInputFieldSchema!=null) {
                             inputUid = matchedInputFieldSchema.uid;
                             uidMapping.add(new Pair<Long, Long>(uid, inputUid));
@@ -129,10 +145,9 @@ public class LOUnion extends LogicalRela
                     }
                     else {
                         matchedInputFieldSchema = mergedSchema.getField(i);
-	                	inputUid = ((LogicalRelationalOperator)input).getSchema().getField(i).uid;
+	                	inputUid = inputSchema.getField(i).uid;
 	                	uidMapping.add(new Pair<Long, Long>(uid, inputUid));
                     }
-                	
                 }
             }
 
@@ -145,15 +160,15 @@ public class LOUnion extends LogicalRela
     /**
      * create schema for union-onschema
      */
-    private LogicalSchema createMergedSchemaOnAlias(List<Operator> ops)
+    private LogicalSchema createMergedSchemaOnAlias(List<LogicalSchema> inputSchemas,

+            List<String> inputAliases) 
     throws FrontendException {
         ArrayList<LogicalSchema> schemas = new ArrayList<LogicalSchema>();
-        for( Operator op : ops ){
-            LogicalRelationalOperator lop = (LogicalRelationalOperator)op;
-            LogicalSchema sch = lop.getSchema();
+        for (int i = 0; i < inputSchemas.size(); i++){
+            LogicalSchema sch = inputSchemas.get(i);
             for( LogicalFieldSchema fs : sch.getFields() ) {
                 if(fs.alias == null){
-                    String msg = "Schema of relation " + lop.getAlias()
+                    String msg = "Schema of relation " + inputAliases.get(i)
                         + " has a null fieldschema for column(s). Schema :" + sch.toString(false);
                     throw new FrontendException( this, msg, 1116, PigException.INPUT );
                 }
@@ -174,6 +189,19 @@ public class LOUnion extends LogicalRela
         return mergedSchema;
     }
     
+    private long getCachedOuputUid(long inputUid) {
+        long uid = -1;
+        
+        for (Pair<Long, Long> pair : uidMapping) {
+            if (pair.second==inputUid) {
+                uid = pair.first;
+                break;
+            }
+        }
+        
+        return uid;
+    }
+
     @Override
     public void accept(PlanVisitor v) throws FrontendException {
         if (!(v instanceof LogicalRelationalNodesVisitor)) {

Modified: pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java?rev=1604517&r1=1604516&r2=1604517&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUnionOnSchema.java Sun Jun 22 03:11:17 2014
@@ -31,6 +31,8 @@ import org.apache.pig.EvalFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -51,6 +53,7 @@ public class TestUnionOnSchema  {
     private static final String INP_FILE_2NUMS = "TestUnionOnSchemaInput1";
     private static final String INP_FILE_2NUM_1CHAR_1BAG = "TestUnionOnSchemaInput2";
     private static final String INP_FILE_EMPTY= "TestUnionOnSchemaInput3";
+    private static final String INP_FILE_3NUMS = "TestUnionOnSchemaInput4";
     
     @Before
     public void setUp() throws Exception {
@@ -77,6 +80,11 @@ public class TestUnionOnSchema  {
 
         //3rd input - empty file
         Util.createLocalInputFile(INP_FILE_EMPTY, new String[0]);
+        
+        // 4th input
+        String[] input4 = {"1\t2\t3","4\t5\t6",};
+        Util.createLocalInputFile(INP_FILE_3NUMS, input4);
+
     }
     
     @AfterClass
@@ -449,6 +457,45 @@ public class TestUnionOnSchema  {
         Util.checkQueryOutputsAfterSort(it, expectedRes);
     }
     
+    @Test
+    public void testUnionOnSchemaAdditionalColumnsWithImplicitSplit() throws IOException
{
+        PigServer pig = new PigServer(ExecType.LOCAL);
+        Data data = Storage.resetData(pig);
+        
+        // Use batch to force multiple outputs from relation l3. This causes 
+        // ImplicitSplitInsertVisitor to call SchemaResetter. 
+        pig.setBatchOn();
+        
+        String query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (i : int, j: int);"
+            + "l2 = load '" + INP_FILE_3NUMS + "' as (i : int, j : int, k : int);" 
+            + "l3 = load '" + INP_FILE_EMPTY + "' as (i : int, j : int, k : int, l :int);"
+            + "u = union onschema l1, l2, l3;"
+            + "store u into 'out1' using mock.Storage;"
+            + "store l3 into 'out2' using mock.Storage;"
+        ;
+
+        Util.registerMultiLineQuery(pig, query);
+        
+        pig.executeBatch();
+        
+        
+        List<Tuple> list1 = data.get("out1");
+        List<Tuple> list2 = data.get("out2");
+        
+        List<Tuple> expectedRes = 
+                Util.getTuplesFromConstantTupleStrings(
+                        new String[] {
+                                "(1,2,null,null)",
+                                "(5,3,null,null)",
+                                "(1,2,3,null)",
+                                "(4,5,6,null)",
+                        });
+        
+        Util.compareActualAndExpectedResults(list1, expectedRes);
+        
+        assertEquals(0, list2.size());
+    }
     
     /**
      * Test UNION ONSCHEMA on 3 inputs 



Mime
View raw message