calcite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vladimirsitni...@apache.org
Subject [1/3] incubator-calcite git commit: [CALCITE-483][CALCITE-489] Update Correlate mechanics and implement EnumerableCorrelate (aka nested loops join)
Date Fri, 12 Dec 2014 07:27:02 GMT
Repository: incubator-calcite
Updated Branches:
  refs/heads/master 8e196a41c -> 696da1685


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index 4958495..cba8924 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -290,13 +290,13 @@ ProjectRel(EXPR$0=[_ISO-8859-1'a'], EXPR$1=[$SLICE($2)])
         <Resource name="plan">
             <![CDATA[
 LogicalProject(EXPR$0=['abc'], EXPR$1=[$SLICE($9)])
-  Correlator(condition=[true], joinType=[inner], correlations=[[var0=offset7, var1=offset5]])
+  LogicalCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{5, 7}])
     LogicalTableScan(table=[[CATALOG, SALES, EMP]])
     Collect(field=[EXPR$0])
       LogicalUnion(all=[true])
         LogicalProject(EXPR$0=[$cor0.DEPTNO])
           LogicalValues(tuples=[[{ 0 }]])
-        LogicalProject(EXPR$0=[$cor1.SAL])
+        LogicalProject(EXPR$0=[$cor0.SAL])
           LogicalValues(tuples=[[{ 0 }]])
 ]]>
         </Resource>
@@ -321,7 +321,7 @@ ProjectRel(EXPR$0=[_ISO-8859-1'abc'], EXPR$1=[$SLICE($8)])
         <Resource name="plan">
             <![CDATA[
 LogicalProject(DEPTNO=[$0], NAME=[$1], EMPSET=[$2])
-  Correlator(condition=[true], joinType=[inner], correlations=[[var0=offset0]])
+  LogicalCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{0}])
     LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
     Collect(field=[EXPR$0])
       LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5],
COMM=[$6], DEPTNO=[$7], SLACKER=[$8])
@@ -379,7 +379,7 @@ ProjectRel(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],
SAL=[$5],
             <![CDATA[
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6],
DEPTNO=[$7], SLACKER=[$8])
   LogicalFilter(condition=[IS NOT NULL($9)])
-    Correlator(condition=[true], joinType=[left], correlations=[[var0=offset7]])
+    LogicalCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{7}])
       LogicalTableScan(table=[[CATALOG, SALES, EMP]])
       LogicalAggregate(group=[{}], agg#0=[MIN($0)])
         LogicalProject($f0=[true])
@@ -410,7 +410,7 @@ ProjectRel(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],
SAL=[$5],
 LogicalProject(EXPR$0=[$0])
   Uncollect
     LogicalProject(EXPR$0=[$SLICE($2)])
-      Correlator(condition=[true], joinType=[inner], correlations=[[var0=offset0]])
+      LogicalCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{0}])
         LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
         Collect(field=[EXPR$0])
           LogicalUnion(all=[true])
@@ -440,7 +440,7 @@ ProjectRel(EXPR$0=[$0])
         <Resource name="plan">
             <![CDATA[
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6],
DEPTNO=[$7], SLACKER=[$8], DEPTNO0=[$9], NAME=[$10])
-  Correlator(condition=[true], joinType=[inner], correlations=[[var0=offset7]])
+  LogicalCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{7}])
     LogicalTableScan(table=[[CATALOG, SALES, EMP]])
     LogicalProject(DEPTNO=[$0], NAME=[$1])
       LogicalFilter(condition=[=($cor0.DEPTNO, $0)])
@@ -1595,14 +1595,14 @@ where exists (
             <![CDATA[
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6],
DEPTNO=[$7], SLACKER=[$8])
   LogicalFilter(condition=[IS NOT NULL($9)])
-    Correlator(condition=[true], joinType=[left], correlations=[[var1=offset7, var0=offset7]])
+    LogicalCorrelate(correlation=[$cor1], joinType=[LEFT], requiredColumns=[{7}])
       LogicalTableScan(table=[[CATALOG, SALES, EMP]])
       LogicalAggregate(group=[{}], agg#0=[MIN($0)])
         LogicalProject($f0=[true])
           LogicalProject(EXPR$0=[1])
             LogicalFilter(condition=[<=($0, $cor1.DEPTNO)])
               LogicalProject(DEPTNO=[$0], NAME=[$1])
-                LogicalFilter(condition=[>=($0, $cor0.DEPTNO)])
+                LogicalFilter(condition=[>=($0, $cor1.DEPTNO)])
                   LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
 ]]>
         </Resource>
@@ -1688,6 +1688,84 @@ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],
SAL=[$
 ]]>
         </Resource>
     </TestCase>
+    <TestCase name="testNestedCorrelations">
+        <Resource name="sql">
+            <![CDATA[select * from (select 2+deptno d2, 3+deptno d3 from emp) e
+ where exists (select 1 from (select deptno+1 d1 from dept) d
+ where d1=e.d2 and exists (select 2 from (select deptno+4 d4, deptno+5 d5, deptno+6 d6 from
dept)
+ where d4=d.d1 and d5=d.d1 and d6=e.d3)))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(D2=[$0], D3=[$1])
+  LogicalFilter(condition=[IS NOT NULL($2)])
+    LogicalCorrelate(correlation=[$cor3], joinType=[LEFT], requiredColumns=[{0, 1}])
+      LogicalProject(D2=[+(2, $7)], D3=[+(3, $7)])
+        LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+      LogicalAggregate(group=[{}], agg#0=[MIN($0)])
+        LogicalProject($f0=[true])
+          LogicalProject(EXPR$0=[1])
+            LogicalFilter(condition=[AND(=($0, $cor3.D2), IS NOT NULL($1))])
+              LogicalCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{0}])
+                LogicalProject(D1=[+($0, 1)])
+                  LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+                LogicalAggregate(group=[{}], agg#0=[MIN($0)])
+                  LogicalProject($f0=[true])
+                    LogicalProject(EXPR$0=[2])
+                      LogicalFilter(condition=[AND(=($0, $cor0.D1), =($1, $cor0.D1), =($2,
$cor3.D3))])
+                        LogicalProject(D4=[+($0, 4)], D5=[+($0, 5)], D6=[+($0, 6)])
+                          LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+]]>
+        </Resource>
+    </TestCase>
+    <TestCase name="testNestedCorrelationsDecorrelated">
+        <Resource name="sql">
+            <![CDATA[select * from (select 2+deptno d2, 3+deptno d3 from emp) e
+ where exists (select 1 from (select deptno+1 d1 from dept) d
+ where d1=e.d2 and exists (select 2 from (select deptno+4 d4, deptno+5 d5, deptno+6 d6 from
dept)
+ where d4=d.d1 and d5=d.d1 and d6=e.d3)))]]>
+        </Resource>
+        <Resource name="plan">
+            <![CDATA[
+LogicalProject(D2=[$0], D3=[$1])
+  LogicalFilter(condition=[IS NOT NULL($2)])
+    LogicalProject(D2=[$0], D3=[$1], $f0=[$4])
+      LogicalJoin(condition=[AND(=($0, $2), =($1, $3))], joinType=[left])
+        LogicalProject(D2=[+(2, $7)], D3=[+(3, $7)])
+          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+        LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
+          LogicalProject($f00=[$1], $f03=[$2], $f0=[$0])
+            LogicalProject($f0=[true], $f00=[$1], $f03=[$2])
+              LogicalProject(EXPR$0=[1], $f00=[$3], $f03=[$2])
+                LogicalFilter(condition=[AND(=($0, $3), IS NOT NULL($1))])
+                  LogicalJoin(condition=[true], joinType=[inner])
+                    LogicalProject(D1=[$0], $f0=[$4], $f03=[$3])
+                      LogicalJoin(condition=[AND(=($0, $1), =($0, $2))], joinType=[left])
+                        LogicalProject(D1=[+($0, 1)])
+                          LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+                        LogicalAggregate(group=[{0, 1, 2}], agg#0=[MIN($3)])
+                          LogicalProject(D1=[$1], D12=[$2], $f03=[$3], $f0=[$0])
+                            LogicalProject($f0=[true], D1=[$1], D12=[$2], $f03=[$3])
+                              LogicalProject(EXPR$0=[2], D1=[$3], D12=[$3], $f0=[$4])
+                                LogicalFilter(condition=[AND(=($0, $3), =($1, $3), =($2,
$4))])
+                                  LogicalJoin(condition=[true], joinType=[inner])
+                                    LogicalProject(D4=[+($0, 4)], D5=[+($0, 5)], D6=[+($0,
6)])
+                                      LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+                                    LogicalJoin(condition=[true], joinType=[inner])
+                                      LogicalAggregate(group=[{0}])
+                                        LogicalProject(D1=[+($0, 1)])
+                                          LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
+                                      LogicalAggregate(group=[{0}])
+                                        LogicalProject($f0=[$1])
+                                          LogicalProject(D2=[+(2, $7)], D3=[+(3, $7)])
+                                            LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+                    LogicalAggregate(group=[{0}])
+                      LogicalProject($f0=[$0])
+                        LogicalProject(D2=[+(2, $7)], D3=[+(3, $7)])
+                          LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+        </Resource>
+    </TestCase>
     <TestCase name="testWithInsideWhereExistsDecorrelate">
         <Resource name="sql">
             <![CDATA[select * from emp
@@ -1700,12 +1778,12 @@ where exists (
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6],
DEPTNO=[$7], SLACKER=[$8])
   LogicalFilter(condition=[IS NOT NULL($9)])
     LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6],
DEPTNO=[$7], SLACKER=[$8], $f0=[$11])
-      LogicalJoin(condition=[AND(=($7, $10), =($7, $9))], joinType=[left])
+      LogicalJoin(condition=[AND(=($7, $9), =($7, $10))], joinType=[left])
         LogicalTableScan(table=[[CATALOG, SALES, EMP]])
         LogicalAggregate(group=[{0, 1}], agg#0=[MIN($2)])
-          LogicalProject($f01=[$1], $f00=[$2], $f0=[$0])
-            LogicalProject($f0=[true], $f01=[$1], $f00=[$2])
-              LogicalProject(EXPR$0=[1], $f0=[$2], $f00=[$3])
+          LogicalProject($f00=[$1], $f02=[$2], $f0=[$0])
+            LogicalProject($f0=[true], $f00=[$1], $f02=[$2])
+              LogicalProject(EXPR$0=[1], $f00=[$3], $f0=[$2])
                 LogicalFilter(condition=[<=($0, $3)])
                   LogicalJoin(condition=[true], joinType=[inner])
                     LogicalProject(DEPTNO=[$0], NAME=[$1], $f0=[$2])
@@ -1754,7 +1832,7 @@ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],
SAL=[$
             <![CDATA[
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6],
DEPTNO=[$7], SLACKER=[$8])
   LogicalFilter(condition=[IS NOT NULL($9)])
-    Correlator(condition=[true], joinType=[left], correlations=[[var0=offset7]])
+    LogicalCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{7}])
       LogicalTableScan(table=[[CATALOG, SALES, EMP]])
       LogicalAggregate(group=[{}], agg#0=[MIN($0)])
         LogicalProject($f0=[true])
@@ -1775,7 +1853,7 @@ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],
SAL=[$
 LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6],
DEPTNO=[$7], SLACKER=[$8])
   LogicalFilter(condition=[IS NOT NULL($9)])
     LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$5], COMM=[$6],
DEPTNO=[$7], SLACKER=[$8], $f0=[$9])
-      Correlator(condition=[true], joinType=[left], correlations=[[var0=offset7]])
+      LogicalCorrelate(correlation=[$cor0], joinType=[LEFT], requiredColumns=[{7}])
         LogicalTableScan(table=[[CATALOG, SALES, EMP]])
         LogicalAggregate(group=[{}], agg#0=[MIN($0)])
           LogicalProject($f0=[true])
@@ -1793,7 +1871,7 @@ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4],
SAL=[$
         <Resource name="plan">
             <![CDATA[
 LogicalProject(DEPTNO=[$0], NAME=[$1], EXPR$0=[$2])
-  Correlator(condition=[true], joinType=[inner], correlations=[[var0=offset0]])
+  LogicalCorrelate(correlation=[$cor0], joinType=[INNER], requiredColumns=[{0}])
     LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
     Uncollect
       LogicalProject(EXPR$0=[$SLICE($0)])

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/main/java/org/apache/calcite/linq4j/CorrelateJoinType.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/CorrelateJoinType.java b/linq4j/src/main/java/org/apache/calcite/linq4j/CorrelateJoinType.java
new file mode 100644
index 0000000..322567e
--- /dev/null
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/CorrelateJoinType.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.linq4j;
+
+/**
+ * Specifies the type of correlation operation: inner, left, semi, or anti.
+ */
+public enum CorrelateJoinType {
+  /**
+   * Inner join
+   */
+  INNER,
+
+  /**
+   * Left-outer join
+   */
+  LEFT,
+
+  /**
+   * Semi-join
+   * <p>Similar to from A ... where a in (select b from B ...)</p>
+   */
+  SEMI,
+
+  /**
+   * Anti-join
+   * <p>Similar to from A ... where a NOT in (select b from B ...)</p>
+   * <p>Note: if B.b is nullable and B has nulls, no rows must be returned</p>
+   */
+  ANTI;
+}
+
+// End SemiJoinType.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java b/linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java
index a0853bf..c0c2b4c 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java
@@ -370,6 +370,13 @@ public abstract class DefaultEnumerable<T> implements OrderedEnumerable<T>
{
         generateNullsOnRight);
   }
 
+  public <TInner, TResult> Enumerable<TResult> correlateJoin(
+      CorrelateJoinType joinType, Function1<T, Enumerable<TInner>> inner,
+      Function2<T, TInner, TResult> resultSelector) {
+    return EnumerableDefaults.correlateJoin(joinType, getThis(), inner,
+        resultSelector);
+  }
+
   public T last() {
     return EnumerableDefaults.last(getThis());
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
index b2f64a7..4be759c 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java
@@ -1033,6 +1033,105 @@ public abstract class EnumerableDefaults {
   }
 
   /**
+   * Returns elements of {@code outer} for which there is a member of
+   * {@code inner} with a matching key. A specified
+   * {@code EqualityComparer<TSource>} is used to compare keys.
+   */
+  public static <TSource, TInner, TResult> Enumerable<TResult> correlateJoin(
+      final CorrelateJoinType joinType, final Enumerable<TSource> outer,
+      final Function1<TSource, Enumerable<TInner>> inner,
+      final Function2<TSource, TInner, TResult> resultSelector) {
+    return new AbstractEnumerable<TResult>() {
+      public Enumerator<TResult> enumerator() {
+        return new Enumerator<TResult>() {
+          private Enumerator<TSource> outerEnumerator = outer.enumerator();
+          private Enumerator<TInner> innerEnumerator;
+          TSource outerValue;
+          TInner innerValue;
+          int state = 0; // 0 -- moving outer, 1 moving inner;
+
+          public TResult current() {
+            return resultSelector.apply(outerValue, innerValue);
+          }
+
+          public boolean moveNext() {
+            while (true) {
+              switch (state) {
+              case 0:
+                // move outer
+                if (!outerEnumerator.moveNext()) {
+                  return false;
+                }
+                outerValue = outerEnumerator.current();
+                // initial move inner
+                Enumerable<TInner> innerEnumerable = inner.apply(outerValue);
+                if (innerEnumerable == null) {
+                  innerEnumerable = Linq4j.emptyEnumerable();
+                }
+                if (innerEnumerator != null) {
+                  innerEnumerator.close();
+                }
+                innerEnumerator = innerEnumerable.enumerator();
+                if (innerEnumerator.moveNext()) {
+                  switch (joinType) {
+                  case ANTI:
+                    // For anti-join need to try next outer row
+                    // Current does not match
+                    continue;
+                  case SEMI:
+                    return true; // current row matches
+                  }
+                  // INNER and LEFT just return result
+                  innerValue = innerEnumerator.current();
+                  state = 1; // iterate over inner results
+                  return true;
+                }
+                // No match detected
+                innerValue = null;
+                switch (joinType) {
+                case LEFT:
+                case ANTI:
+                  return true;
+                }
+                // For INNER and LEFT need to find another outer row
+                continue;
+              case 1:
+                // subsequent move inner
+                if (innerEnumerator.moveNext()) {
+                  innerValue = innerEnumerator.current();
+                  return true;
+                }
+                state = 0;
+                // continue loop, move outer
+              }
+            }
+          }
+
+          public void reset() {
+            state = 0;
+            outerEnumerator.reset();
+            closeInner();
+          }
+
+          public void close() {
+            outerEnumerator.close();
+            closeInner();
+            outerValue = null;
+          }
+
+          private void closeInner() {
+            innerValue = null;
+            if (innerEnumerator != null) {
+              innerEnumerator.close();
+              innerEnumerator = null;
+            }
+          }
+        };
+      }
+    };
+  }
+
+  /**
    * Returns the last element of a sequence. (Defined
    * by Enumerable.)
    */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java
----------------------------------------------------------------------
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java b/linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java
index f657dd7..a5c41c0 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java
@@ -533,6 +533,19 @@ public interface ExtendedEnumerable<TSource> {
       boolean generateNullsOnLeft, boolean generateNullsOnRight);
 
   /**
+   * For each row of the current enumerable returns the correlated rows
+   * from the {@code inner} enumerable (nested loops join).
+   *
+   * @param joinType inner, left, semi or anti join type
+   * @param inner generator of inner enumerable
+   * @param resultSelector selector of the result. For semi/anti join
+   *                       inner argument is always null.
+   */
+  <TInner, TResult> Enumerable<TResult> correlateJoin(
+      CorrelateJoinType joinType, Function1<TSource, Enumerable<TInner>> inner,
+      Function2<TSource, TInner, TResult> resultSelector);
+
+  /**
    * Returns the last element of a sequence. (Defined
    * by Enumerable.)
    */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/test/java/org/apache/calcite/linq4j/test/CorrelateJoinTest.java
----------------------------------------------------------------------
diff --git a/linq4j/src/test/java/org/apache/calcite/linq4j/test/CorrelateJoinTest.java b/linq4j/src/test/java/org/apache/calcite/linq4j/test/CorrelateJoinTest.java
new file mode 100644
index 0000000..6c49453
--- /dev/null
+++ b/linq4j/src/test/java/org/apache/calcite/linq4j/test/CorrelateJoinTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.linq4j.test;
+
+import org.apache.calcite.linq4j.CorrelateJoinType;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Linq4j;
+import org.apache.calcite.linq4j.function.Function1;
+import org.apache.calcite.linq4j.function.Function2;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests {@link org.apache.calcite.linq4j.ExtendedEnumerable#correlateJoin}
+ */
+public class CorrelateJoinTest {
+  static final Function2<Integer, Integer, Integer[]> SELECT_BOTH =
+      new Function2<Integer, Integer, Integer[]>() {
+        public Integer[] apply(Integer v0, Integer v1) {
+          return new Integer[]{v0, v1};
+        }
+      };
+
+  @Test public void testInner() {
+    testJoin(CorrelateJoinType.INNER, new Integer[][]{
+      {2, 20},
+      {3, -30},
+      {3, -60},
+      {20, 200},
+      {30, -300},
+      {30, -600}});
+  }
+
+  @Test public void testLeft() {
+    testJoin(CorrelateJoinType.LEFT, new Integer[][]{
+      {1, null},
+      {2, 20},
+      {3, -30},
+      {3, -60},
+      {10, null},
+      {20, 200},
+      {30, -300},
+      {30, -600}});
+  }
+
+  @Test public void testSemi() {
+    testJoin(CorrelateJoinType.SEMI, new Integer[][]{
+      {2, null},
+      {3, null},
+      {20, null},
+      {30, null}});
+  }
+
+  @Test public void testAnti() {
+    testJoin(CorrelateJoinType.ANTI, new Integer[][]{
+      {1, null},
+      {10, null}});
+  }
+
+  public void testJoin(CorrelateJoinType joinType, Integer[][] expected) {
+    Enumerable<Integer[]> join =
+        Linq4j.asEnumerable(ImmutableList.of(1, 2, 3, 10, 20, 30))
+            .correlateJoin(joinType,
+                new Function1<Integer, Enumerable<Integer>>() {
+                  public Enumerable<Integer> apply(Integer a0) {
+                    if (a0 == 1 || a0 == 10) {
+                      return Linq4j.emptyEnumerable();
+                    }
+                    if (a0 == 2 || a0 == 20) {
+                      return Linq4j.singletonEnumerable(a0 * 10);
+                    }
+                    if (a0 == 3 || a0 == 30) {
+                      return Linq4j.asEnumerable(
+                          ImmutableList.of(-a0 * 10, -a0 * 20));
+                    }
+                    throw new IllegalArgumentException(
+                        "Unexpected input " + a0);
+                  }
+                }, SELECT_BOTH);
+    for (int i = 0; i < 2; i++) {
+      Enumerator<Integer[]> e = join.enumerator();
+      checkResults(e, expected);
+      e.close();
+    }
+  }
+
+  private void checkResults(Enumerator<Integer[]> e, Integer[][] expected) {
+    List<Integer[]> res = Lists.newArrayList();
+    while (e.moveNext()) {
+      res.add(e.current());
+    }
+    Integer[][] actual = res.toArray(new Integer[res.size()][]);
+    assertArrayEquals(expected, actual);
+  }
+}
+
+// End CorrelateJoinTest.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
----------------------------------------------------------------------
diff --git a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java b/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
index fd044cb..b1dd7a3 100644
--- a/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
+++ b/linq4j/src/test/java/org/apache/calcite/linq4j/test/Linq4jSuite.java
@@ -36,7 +36,8 @@ import org.junit.runners.Suite;
     DeterministicTest.class,
     BlockBuilderTest.class,
     FunctionTest.class,
-    TypeTest.class
+    TypeTest.class,
+    CorrelateJoinTest.class
 })
 public class Linq4jSuite {
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java b/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
index e95dc50..0d89f6f 100644
--- a/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
+++ b/spark/src/main/java/org/apache/calcite/adapter/spark/SparkRules.java
@@ -343,7 +343,7 @@ public abstract class SparkRules {
                 builder2,
                 new RexToLixTranslator.InputGetterImpl(
                     Collections.singletonList(
-                        Pair.of((Expression) e_, result.physType))));
+                        Pair.of((Expression) e_, result.physType))), null);
         builder2.add(
             Expressions.ifThen(
                 Expressions.not(condition),
@@ -360,7 +360,7 @@ public abstract class SparkRules {
               null,
               new RexToLixTranslator.InputGetterImpl(
                   Collections.singletonList(
-                      Pair.of((Expression) e_, result.physType))));
+                      Pair.of((Expression) e_, result.physType))), null);
       builder2.add(
           Expressions.return_(null,
               Expressions.convert_(

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/696da168/spark/src/main/java/org/apache/calcite/adapter/spark/SparkToEnumerableConverter.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/calcite/adapter/spark/SparkToEnumerableConverter.java
b/spark/src/main/java/org/apache/calcite/adapter/spark/SparkToEnumerableConverter.java
index 0e572f1..e270d44 100644
--- a/spark/src/main/java/org/apache/calcite/adapter/spark/SparkToEnumerableConverter.java
+++ b/spark/src/main/java/org/apache/calcite/adapter/spark/SparkToEnumerableConverter.java
@@ -107,7 +107,6 @@ public class SparkToEnumerableConverter
       if (parent != null) {
         assert input == parent.getInputs().get(ordinal);
       }
-      createFrame(parent, ordinal, input);
       return input.implementSpark(this);
     }
 


Mime
View raw message