pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From knogu...@apache.org
Subject svn commit: r1799947 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java test/org/apache/pig/test/TestMultiQuery.java
Date Mon, 26 Jun 2017 16:37:39 GMT
Author: knoguchi
Date: Mon Jun 26 16:37:39 2017
New Revision: 1799947

URL: http://svn.apache.org/viewvc?rev=1799947&view=rev
Log:
PIG-4548: Records Lost With Specific Combination of Commands and Streaming Function (knoguchi)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
    pig/trunk/test/org/apache/pig/test/TestMultiQuery.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1799947&r1=1799946&r2=1799947&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jun 26 16:37:39 2017
@@ -32,6 +32,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4548: Records Lost With Specific Combination of Commands and Streaming Function (knoguchi)
+
 PIG-5262: Fix jdiff related issues: fail build upon error, correct xml character escaping
(szita)
 
 PIG-5225: Several unit tests are not annotated with @Test (nkollar via rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=1799947&r1=1799946&r2=1799947&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
Mon Jun 26 16:37:39 2017
@@ -272,7 +272,19 @@ public class PODemux extends PhysicalOpe
                 }
             } else {
                 curLeaf = leaf;
-                res = leaf.getNextTuple();
+                while (true) {
+                    res = leaf.getNextTuple();
+
+                    if (res.returnStatus == POStatus.STATUS_OK ||
+                            res.returnStatus == POStatus.STATUS_EOP ||
+                            res.returnStatus == POStatus.STATUS_ERR) {
+                        break;
+                    } else if (res.returnStatus == POStatus.STATUS_NULL) {
+                        // this "else if" is unnecessary but keeping it to
+                        // be sync with runPipeline()
+                        continue;
+                    }
+                }
                
                 if (res.returnStatus == POStatus.STATUS_EOP)  {
                     processedSet.set(idx++);        

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1799947&r1=1799946&r2=1799947&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Mon Jun 26 16:37:39 2017
@@ -65,6 +65,14 @@ public class TestMultiQuery {
         deleteOutputFiles();
     }
 
+    private static final String simpleEchoStreamingCommand;
+    static {
+        if (Util.WINDOWS)
+            simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+        else
+            simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+    }
+
     @Before
     public void setUp() throws Exception {
         deleteOutputFiles();
@@ -955,6 +963,40 @@ public class TestMultiQuery {
                 .translateSchema(myPig.dumpSchema("C2")), Util.isSparkExecType(Util.getLocalTestMode()));
     }
 
+    @Test
+    public void testMultiQueryJiraPig4548() throws Exception {
+        Storage.Data data = Storage.resetData(myPig);
+        data.set("inputLocation",
+                Storage.tuple("1", "f"),
+                Storage.tuple("2", "f"),
+                Storage.tuple("3", "f"),
+                Storage.tuple("4", "f"),
+                Storage.tuple("5", "f"),
+                Storage.tuple("6", "f"));
+        myPig.setBatchOn();
+        myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (f1:chararray,
f2:chararray);");
+        myPig.registerQuery("SPLIT A into T if (f2 == 'T'), F otherwise;");
+        myPig.registerQuery("T2 = group T by f1;");
+        myPig.registerQuery("store T2 into 'output1' using mock.Storage();");
+        myPig.registerQuery("F2 = group F by f1;");
+        myPig.registerQuery("F3 = stream F2 through `" + simpleEchoStreamingCommand
+                            + "` as (group:chararray, F:bag{tuple(f1: chararray,f2: chararray)});");
+        myPig.registerQuery("store F3 into 'output2' using mock.Storage();");
+        myPig.executeBatch();
+
+        List<Tuple> actualResults = data.get("output1");
+        assertEquals("Number of records for output1 should be 0",0,actualResults.size());
+        actualResults = data.get("output2");
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                                         new String[]{"('1',{('1','f')})",
+                                                      "('2',{('2','f')})",
+                                                      "('3',{('3','f')})",
+                                                      "('4',{('4','f')})",
+                                                      "('5',{('5','f')})",
+                                                      "('6',{('6','f')})"});
+        Util.checkQueryOutputsAfterSort(actualResults.iterator(), expectedResults);
+    }
+
     // --------------------------------------------------------------------------
     // Helper methods
 



Mime
View raw message