asterixdb-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From AsterixDB Code Review <do-not-re...@asterix-gerrit.ics.uci.edu>
Subject Change in asterixdb[mad-hatter]: [ASTERIXDB-2857][RT] Incorrect result for nested loop outer join
Date Mon, 07 Jun 2021 22:46:40 GMT
>From Dmitry Lychagin <dmitry.lychagin@couchbase.com>:

Dmitry Lychagin has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11825
)


Change subject: [ASTERIXDB-2857][RT] Incorrect result for nested loop outer join
......................................................................

[ASTERIXDB-2857][RT] Incorrect result for nested loop outer join

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Fix incorrect number of unmatched tuples emitted
  by nested loop implementation of left outer join
- Add RunFileWriter.eraseClosed() method

Change-Id: Id48c2f25acf85e61bb1112811ecfe544a0edae86
---
A asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/query-ASTERIXDB-2857.sqlpp
A asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/query-ASTERIXDB-2857.plan
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.1.ddl.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.2.update.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.query.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.adm
M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
M hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
10 files changed, 326 insertions(+), 72 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/25/11825/1

diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/query-ASTERIXDB-2857.sqlpp
b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/query-ASTERIXDB-2857.sqlpp
new file mode 100644
index 0000000..2304349
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/queries/leftouterjoin/query-ASTERIXDB-2857.sqlpp
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test nested loop implementation of left outer join
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type tenkType as closed {
+  unique1         : integer,
+  unique2         : integer,
+  two             : integer,
+  four            : integer,
+  ten             : integer,
+  twenty          : integer,
+  hundred         : integer,
+  thousand        : integer,
+  twothousand     : integer,
+  fivethous       : integer,
+  tenthous        : integer,
+  odd100          : integer,
+  even100         : integer,
+  stringu1        : string,
+  stringu2        : string,
+  string4         : string
+};
+
+create dataset tenk(tenkType) primary key unique2;
+
+SELECT
+   t0.unique1 AS t0_unique1,
+   t1.unique1 AS t1_unique1,
+   t2.unique1 AS t2_unique1
+FROM (
+   SELECT unique1, unique2 FROM tenk WHERE unique2 < 2
+) t0
+INNER JOIN (
+   SELECT unique1, unique2 FROM tenk WHERE unique2 < 4
+) t1 ON t0.unique2 = t1.unique2
+LEFT JOIN (
+   SELECT unique1, unique2 FROM tenk WHERE unique2 < 6
+) t2 ON t0.unique2 + t2.unique2 = 2 * t1.unique2
+ORDER BY t0_unique1, t1_unique1, t2_unique1;
diff --git a/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/query-ASTERIXDB-2857.plan
b/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/query-ASTERIXDB-2857.plan
new file mode 100644
index 0000000..393f1db
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/optimizerts/results/leftouterjoin/query-ASTERIXDB-2857.plan
@@ -0,0 +1,39 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- SORT_MERGE_EXCHANGE [$$136(ASC), $$137(ASC), $#3(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$136(ASC), $$137(ASC), $#3(ASC)]  |PARTITIONED|
+            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- NESTED_LOOP  |PARTITIONED|
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$127][$$128]  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- BTREE_SEARCH (test.tenk.tenk)  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- STREAM_PROJECT  |PARTITIONED|
+                                  -- ASSIGN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- BTREE_SEARCH (test.tenk.tenk)  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- ASSIGN  |PARTITIONED|
+                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                    -- BROADCAST_EXCHANGE  |PARTITIONED|
+                      -- STREAM_PROJECT  |PARTITIONED|
+                        -- ASSIGN  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- BTREE_SEARCH (test.tenk.tenk)  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- ASSIGN  |PARTITIONED|
+                                  -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.1.ddl.sqlpp
new file mode 100644
index 0000000..87b5d75
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.1.ddl.sqlpp
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test nested loop implementation of left outer join
+ */
+
+drop  dataverse test if exists;
+create  dataverse test;
+
+use test;
+
+create type tenkType as closed {
+  unique1         : integer,
+  unique2         : integer,
+  two             : integer,
+  four            : integer,
+  ten             : integer,
+  twenty          : integer,
+  hundred         : integer,
+  thousand        : integer,
+  twothousand     : integer,
+  fivethous       : integer,
+  tenthous        : integer,
+  odd100          : integer,
+  even100         : integer,
+  stringu1        : string,
+  stringu2        : string,
+  string4         : string
+};
+
+create dataset tenk(tenkType) primary key unique2;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.2.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.2.update.sqlpp
new file mode 100644
index 0000000..2d7e768
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.2.update.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use test;
+
+load  dataset tenk using localfs ((`path`=`asterix_nc1://data/tenk.tbl`),(`format`=`delimited-text`),(`delimiter`=`|`));
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.query.sqlpp
new file mode 100644
index 0000000..823a540
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.query.sqlpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Test nested loop implementation of left outer join
+ */
+
+USE test;
+
+SELECT
+   t0.unique1 AS t0_unique1,
+   t1.unique1 AS t1_unique1,
+   t2.unique1 AS t2_unique1
+FROM (
+   SELECT unique1, unique2 FROM tenk WHERE unique2 < 2
+) t0
+INNER JOIN (
+   SELECT unique1, unique2 FROM tenk WHERE unique2 < 4
+) t1 ON t0.unique2 = t1.unique2
+LEFT JOIN (
+   SELECT unique1, unique2 FROM tenk WHERE unique2 < 6
+) t2 ON t0.unique2 + t2.unique2 = 2 * t1.unique2
+ORDER BY t0_unique1, t1_unique1, t2_unique1;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.adm
new file mode 100644
index 0000000..1a31db8
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/leftouterjoin/query-ASTERIXDB-2857/query-ASTERIXDB-2857.3.adm
@@ -0,0 +1,2 @@
+{ "t0_unique1": 1891, "t1_unique1": 1891, "t2_unique1": 1891 }
+{ "t0_unique1": 8800, "t1_unique1": 8800, "t2_unique1": 8800 }
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index ce1303b..ed06764 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -11892,6 +11892,11 @@
       </compilation-unit>
     </test-case>
     <test-case FilePath="leftouterjoin">
+      <compilation-unit name="query-ASTERIXDB-2857">
+        <output-dir compare="Text">query-ASTERIXDB-2857</output-dir>
+      </compilation-unit>
+    </test-case>
+    <test-case FilePath="leftouterjoin">
       <compilation-unit name="right_branch_opt_1">
         <output-dir compare="Text">right_branch_opt_1</output-dir>
       </compilation-unit>
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
index dabdd4f..c370b58 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/RunFileWriter.java
@@ -75,9 +75,15 @@
     }
 
     public void erase() throws HyracksDataException {
-        close();
-        file.delete();
+        try {
+            close();
+        } finally {
+            eraseClosed();
+        }
+    }
 
+    public void eraseClosed() {
+        file.delete();
         // Make sure we never access the file if it is deleted.
         file = null;
         handle = null;
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 2eae25c..dbc91c2 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -20,6 +20,7 @@
 
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 
 import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -28,6 +29,7 @@
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
 import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
+import org.apache.hyracks.api.exceptions.ErrorCode;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
@@ -43,6 +45,14 @@
 import org.apache.hyracks.dataflow.std.buffermanager.VariableFramePool;
 
 public class NestedLoopJoin {
+    // Note: Min memory budget should be less than {@code AbstractJoinPOperator.MIN_FRAME_LIMIT_FOR_JOIN}
+    // Inner join: 1 frame for the outer input side, 1 frame for the inner input side, 1
frame for the output
+    private static final int MIN_FRAME_BUDGET_INNER_JOIN = 3;
+    // Outer join extra: Add 1 frame for the {@code outerMatchLOJ} bitset
+    private static final int MIN_FRAME_BUDGET_OUTER_JOIN = MIN_FRAME_BUDGET_INNER_JOIN +
1;
+    // Outer join needs 1 bit per each tuple in the outer side buffer
+    private static final int ESTIMATE_AVG_TUPLE_SIZE = 128;
+
     private final FrameTupleAccessor accessorInner;
     private final FrameTupleAccessor accessorOuter;
     private final FrameTupleAppender appender;
@@ -54,30 +64,45 @@
     private final boolean isLeftOuter;
     private final ArrayTupleBuilder missingTupleBuilder;
     private final IPredicateEvaluator predEvaluator;
-    private boolean isReversed; //Added for handling correct calling for predicate-evaluator
upon recursive calls (in OptimizedHybridHashJoin) that cause role-reversal
+    // Added for handling correct calling for predicate-evaluator upon recursive calls
+    // (in OptimizedHybridHashJoin) that cause role-reversal
+    private final boolean isReversed;
     private final BufferInfo tempInfo = new BufferInfo(null, -1, -1);
+    private final BitSet outerMatchLOJ;
 
     public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
-            FrameTupleAccessor accessorInner, int memSize, IPredicateEvaluator predEval,
boolean isLeftOuter,
+            FrameTupleAccessor accessorInner, int memBudgetInFrames, IPredicateEvaluator
predEval, boolean isLeftOuter,
             IMissingWriter[] missingWriters) throws HyracksDataException {
+        this(jobletContext, accessorOuter, accessorInner, memBudgetInFrames, predEval, isLeftOuter,
missingWriters,
+                false);
+    }
+
+    public NestedLoopJoin(IHyracksJobletContext jobletContext, FrameTupleAccessor accessorOuter,
+            FrameTupleAccessor accessorInner, int memBudgetInFrames, IPredicateEvaluator
predEval, boolean isLeftOuter,
+            IMissingWriter[] missingWriters, boolean isReversed) throws HyracksDataException
{
         this.accessorInner = accessorInner;
         this.accessorOuter = accessorOuter;
         this.appender = new FrameTupleAppender();
         this.outBuffer = new VSizeFrame(jobletContext);
         this.innerBuffer = new VSizeFrame(jobletContext);
         this.appender.reset(outBuffer, true);
-        if (memSize < 3) {
-            throw new HyracksDataException("Not enough memory is available for Nested Loop
Join");
+
+        int minMemBudgetInFrames = isLeftOuter ? MIN_FRAME_BUDGET_OUTER_JOIN : MIN_FRAME_BUDGET_INNER_JOIN;
+        if (memBudgetInFrames < minMemBudgetInFrames) {
+            throw new HyracksDataException(ErrorCode.INSUFFICIENT_MEMORY);
         }
+        int outerBufferMngrMemBudgetInFrames = memBudgetInFrames - minMemBudgetInFrames +
1;
+        int outerBufferMngrMemBudgetInBytes = jobletContext.getInitialFrameSize() * outerBufferMngrMemBudgetInFrames;
         this.outerBufferMngr = new VariableFrameMemoryManager(
-                new VariableFramePool(jobletContext, jobletContext.getInitialFrameSize()
* (memSize - 2)),
-                FrameFreeSlotPolicyFactory.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT,
memSize - 2));
+                new VariableFramePool(jobletContext, outerBufferMngrMemBudgetInBytes), FrameFreeSlotPolicyFactory
+                        .createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT, outerBufferMngrMemBudgetInFrames));
 
         this.predEvaluator = predEval;
-        this.isReversed = false;
-
         this.isLeftOuter = isLeftOuter;
         if (isLeftOuter) {
+            if (isReversed) {
+                throw new HyracksDataException(ErrorCode.INVALID_OPERATOR_OPERATION, "reverse",
"outer join");
+            }
             int innerFieldCount = this.accessorInner.getFieldCount();
             missingTupleBuilder = new ArrayTupleBuilder(innerFieldCount);
             DataOutput out = missingTupleBuilder.getDataOutput();
@@ -85,9 +110,14 @@
                 missingWriters[i].writeMissing(out);
                 missingTupleBuilder.addFieldEndOffset();
             }
+            // Outer join needs 1 bit per each tuple in the outer side buffer
+            int outerMatchLOJCardinalityEstimate = outerBufferMngrMemBudgetInBytes / ESTIMATE_AVG_TUPLE_SIZE;
+            outerMatchLOJ = new BitSet(Math.max(outerMatchLOJCardinalityEstimate, 1));
         } else {
             missingTupleBuilder = null;
+            outerMatchLOJ = null;
         }
+        this.isReversed = isReversed;
 
         FileReference file =
                 jobletContext.createManagedWorkspaceFile(this.getClass().getSimpleName()
+ this.toString());
@@ -110,23 +140,7 @@
 
     public void join(ByteBuffer outerBuffer, IFrameWriter writer) throws HyracksDataException
{
         if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
-            RunFileReader runFileReader = runFileWriter.createReader();
-            try {
-                runFileReader.open();
-                if (runFileReader.nextFrame(innerBuffer)) {
-                    do {
-                        for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
-                            blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(),
writer);
-                        }
-                    } while (runFileReader.nextFrame(innerBuffer));
-                } else if (isLeftOuter) {
-                    for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
-                        appendMissing(outerBufferMngr.getFrame(i, tempInfo), writer);
-                    }
-                }
-            } finally {
-                runFileReader.close();
-            }
+            multiBlockJoin(writer);
             outerBufferMngr.reset();
             if (outerBufferMngr.insertFrame(outerBuffer) < 0) {
                 throw new HyracksDataException("The given outer frame of size:" + outerBuffer.capacity()
@@ -135,16 +149,51 @@
         }
     }
 
-    private void blockJoin(BufferInfo outerBufferInfo, ByteBuffer innerBuffer, IFrameWriter
writer)
-            throws HyracksDataException {
-        accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(),
outerBufferInfo.getLength());
-        accessorInner.reset(innerBuffer);
-        int tupleCount0 = accessorOuter.getTupleCount();
-        int tupleCount1 = accessorInner.getTupleCount();
+    private void multiBlockJoin(IFrameWriter writer) throws HyracksDataException {
+        int outerBufferFrameCount = outerBufferMngr.getNumFrames();
+        if (outerBufferFrameCount == 0) {
+            return;
+        }
+        RunFileReader runFileReader = runFileWriter.createReader();
+        try {
+            runFileReader.open();
+            if (isLeftOuter) {
+                outerMatchLOJ.clear();
+            }
+            while (runFileReader.nextFrame(innerBuffer)) {
+                int outerTupleRunningCount = 0;
+                for (int i = 0; i < outerBufferFrameCount; i++) {
+                    BufferInfo outerBufferInfo = outerBufferMngr.getFrame(i, tempInfo);
+                    accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(),
+                            outerBufferInfo.getLength());
+                    int outerTupleCount = accessorOuter.getTupleCount();
+                    accessorInner.reset(innerBuffer.getBuffer());
+                    blockJoin(outerTupleRunningCount, writer);
+                    outerTupleRunningCount += outerTupleCount;
+                }
+            }
+            if (isLeftOuter) {
+                int outerTupleRunningCount = 0;
+                for (int i = 0; i < outerBufferFrameCount; i++) {
+                    BufferInfo outerBufferInfo = outerBufferMngr.getFrame(i, tempInfo);
+                    accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(),
+                            outerBufferInfo.getLength());
+                    int outerFrameTupleCount = accessorOuter.getTupleCount();
+                    appendMissing(outerTupleRunningCount, outerFrameTupleCount, writer);
+                    outerTupleRunningCount += outerFrameTupleCount;
+                }
+            }
+        } finally {
+            runFileReader.close();
+        }
+    }
 
-        for (int i = 0; i < tupleCount0; ++i) {
+    private void blockJoin(int outerTupleStartPos, IFrameWriter writer) throws HyracksDataException
{
+        int outerTupleCount = accessorOuter.getTupleCount();
+        int innerTupleCount = accessorInner.getTupleCount();
+        for (int i = 0; i < outerTupleCount; ++i) {
             boolean matchFound = false;
-            for (int j = 0; j < tupleCount1; ++j) {
+            for (int j = 0; j < innerTupleCount; ++j) {
                 int c = tpComparator.compare(accessorOuter, i, accessorInner, j);
                 boolean prdEval = evaluatePredicate(i, j);
                 if (c == 0 && prdEval) {
@@ -152,13 +201,8 @@
                     appendToResults(i, j, writer);
                 }
             }
-
-            if (!matchFound && isLeftOuter) {
-                final int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets();
-                final byte[] ntByteArray = missingTupleBuilder.getByteArray();
-                final int ntSize = missingTupleBuilder.getSize();
-                FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, i, ntFieldEndOffsets,
ntByteArray, 0,
-                        ntSize);
+            if (isLeftOuter && matchFound) {
+                outerMatchLOJ.set(outerTupleStartPos + i);
             }
         }
     }
@@ -184,15 +228,18 @@
         FrameUtils.appendConcatToWriter(writer, appender, accessor1, tupleId1, accessor2,
tupleId2);
     }
 
-    private void appendMissing(BufferInfo outerBufferInfo, IFrameWriter writer) throws HyracksDataException
{
-        accessorOuter.reset(outerBufferInfo.getBuffer(), outerBufferInfo.getStartOffset(),
outerBufferInfo.getLength());
-        int tupleCount = accessorOuter.getTupleCount();
-        for (int i = 0; i < tupleCount; ++i) {
-            final int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets();
-            final byte[] ntByteArray = missingTupleBuilder.getByteArray();
-            final int ntSize = missingTupleBuilder.getSize();
-            FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, i, ntFieldEndOffsets,
ntByteArray, 0,
-                    ntSize);
+    private void appendMissing(int outerFrameMngrStartPos, int outerFrameTupleCount, IFrameWriter
writer)
+            throws HyracksDataException {
+        int limit = outerFrameMngrStartPos + outerFrameTupleCount;
+        for (int outerTuplePos =
+                outerMatchLOJ.nextClearBit(outerFrameMngrStartPos); outerTuplePos < limit;
outerTuplePos =
+                        outerMatchLOJ.nextClearBit(outerTuplePos + 1)) {
+            int[] ntFieldEndOffsets = missingTupleBuilder.getFieldEndOffsets();
+            byte[] ntByteArray = missingTupleBuilder.getByteArray();
+            int ntSize = missingTupleBuilder.getSize();
+            int outerAccessorTupleIndex = outerTuplePos - outerFrameMngrStartPos;
+            FrameUtils.appendConcatToWriter(writer, appender, accessorOuter, outerAccessorTupleIndex,
ntFieldEndOffsets,
+                    ntByteArray, 0, ntSize);
         }
     }
 
@@ -203,22 +250,10 @@
     }
 
     public void completeJoin(IFrameWriter writer) throws HyracksDataException {
-        RunFileReader runFileReader = runFileWriter.createDeleteOnCloseReader();
         try {
-            runFileReader.open();
-            if (runFileReader.nextFrame(innerBuffer)) {
-                do {
-                    for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
-                        blockJoin(outerBufferMngr.getFrame(i, tempInfo), innerBuffer.getBuffer(),
writer);
-                    }
-                } while (runFileReader.nextFrame(innerBuffer));
-            } else if (isLeftOuter) {
-                for (int i = 0; i < outerBufferMngr.getNumFrames(); i++) {
-                    appendMissing(outerBufferMngr.getFrame(i, tempInfo), writer);
-                }
-            }
+            multiBlockJoin(writer);
         } finally {
-            runFileReader.close();
+            runFileWriter.eraseClosed();
         }
         appender.write(writer, true);
     }
@@ -226,8 +261,4 @@
     public void releaseMemory() throws HyracksDataException {
         outerBufferMngr.reset();
     }
-
-    public void setIsReversed(boolean b) {
-        this.isReversed = b;
-    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 97f9c24..11d2c5a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -795,11 +795,10 @@
                     // The nested loop join result is outer + inner. All the other operator
is probe + build.
                     // Hence the reverse relation is different.
                     boolean isReversed = outerRd == buildRd && innerRd == probeRd;
-                    assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse
roles";
                     ITuplePairComparator nljComptorOuterInner = isReversed ? buildComp :
probComp;
                     NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(outerRd),
-                            new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter,
nonMatchWriter);
-                    nlj.setIsReversed(isReversed);
+                            new FrameTupleAccessor(innerRd), memorySize, predEvaluator, isLeftOuter,
nonMatchWriter,
+                            isReversed);
                     nlj.setComparator(nljComptorOuterInner);
 
                     IFrame cacheBuff = new VSizeFrame(jobletCtx);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11825
To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: mad-hatter
Gerrit-Change-Id: Id48c2f25acf85e61bb1112811ecfe544a0edae86
Gerrit-Change-Number: 11825
Gerrit-PatchSet: 1
Gerrit-Owner: Dmitry Lychagin <dmitry.lychagin@couchbase.com>
Gerrit-MessageType: newchange

Mime
View raw message