ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvvinbl...@apache.org
Subject [ignite] branch ignite-12248 updated: query execution
Date Wed, 04 Dec 2019 15:54:50 GMT
This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 8cdb219  query execution
8cdb219 is described below

commit 8cdb219ecb96bd09c14388f0678e79faf4916c04
Author: Igor Seliverstov <gvvinblade@gmail.com>
AuthorDate: Wed Dec 4 18:54:37 2019 +0300

    query execution
---
 .../processors/query/calcite/exchange/Outbox.java  |   2 +-
 .../query/calcite/exec/AbstractNode.java           |   2 +-
 .../query/calcite/exec/Interpretable.java          | 510 +++++++++++++++++++++
 .../processors/query/calcite/exec/Node.java        |   2 +-
 .../query/calcite/exec/ScalarFactory.java          | 121 -----
 .../processors/query/calcite/exec/ScanNode.java    |  12 +-
 .../query/calcite/metadata/FragmentInfo.java       |  10 +-
 .../calcite/metadata/IgniteMdFragmentInfo.java     |   4 +-
 .../query/calcite/metadata/RelMetadataQueryEx.java |   8 +-
 .../ContextValue.java}                             |  30 +-
 .../query/calcite/prepare/DataContextImpl.java     |  26 +-
 .../rel/{Receiver.java => IgniteReceiver.java}     |   4 +-
 .../calcite/rel/{Sender.java => IgniteSender.java} |  16 +-
 .../query/calcite/rule/PlannerPhase.java           |   8 +
 .../query/calcite/schema/IgniteTable.java          |   9 +-
 .../calcite/serialize/relation/ReceiverNode.java   |   6 +-
 .../serialize/relation/RelToGraphConverter.java    |   8 +-
 .../calcite/serialize/relation/SenderNode.java     |   6 +-
 .../query/calcite/splitter/Fragment.java           |  14 +-
 .../query/calcite/splitter/QueryPlan.java          |   8 +-
 .../query/calcite/splitter/Splitter.java           |  12 +-
 .../query/calcite/util/IgniteRelShuttle.java       |  16 +-
 .../query/calcite/util/RelImplementor.java         |   8 +-
 .../{ScanIterator.java => TableScanIterator.java}  |   4 +-
 .../query/calcite/CalciteQueryProcessorTest.java   | 144 +++++-
 .../query/calcite/exec/ExecutionTest.java          |   4 +-
 26 files changed, 765 insertions(+), 229 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
index b1cf688..9099b46 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
@@ -45,7 +45,7 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T>
 
     private ExchangeProcessor srvc;
 
-    protected Outbox(GridCacheVersion queryId, long exchangeId, Collection<UUID> targets, DestinationFunction function) {
+    public Outbox(GridCacheVersion queryId, long exchangeId, Collection<UUID> targets, DestinationFunction function) {
         super(Sink.noOp());
         this.queryId = queryId;
         this.exchangeId = exchangeId;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
index e832da4..66ba475 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
@@ -22,7 +22,7 @@ import java.util.List;
 /**
  *
  */
-public abstract class AbstractNode<T> implements Node<T>, Source {
+public abstract class AbstractNode<T> implements Node<T> {
     protected final Sink<T> target;
     protected List<Source> sources;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java
new file mode 100644
index 0000000..04108d4
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Interpretable.java
@@ -0,0 +1,510 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.interpreter.Context;
+import org.apache.calcite.interpreter.JaninoRexCompiler;
+import org.apache.calcite.interpreter.Scalar;
+import org.apache.calcite.interpreter.Util;
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.schema.ScannableTable;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.calcite.exchange.Outbox;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
+import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.PLANNER_CONTEXT;
+import static org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue.QUERY_ID;
+
+/**
+ *
+ */
+public class Interpretable {
+    public static final Convention INTERPRETABLE = new Convention.Impl("INTERPRETABLE", InterRel.class) {};
+
+    public static final List<RelOptRule> RULES = ImmutableList.of(
+        new TableScanConverter(),
+        new JoinConverter(),
+        new ProjectConverter(),
+        new FilterConverter(),
+        new SenderConverter(),
+        new ReceiverConverter()
+    );
+
+    public interface InterRel extends RelNode {
+        <T> Node<T> implement(Implementor<T> implementor);
+    }
+
+    public static class TableScanConverter extends ConverterRule {
+        public TableScanConverter() {
+            super(IgniteTableScan.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "TableScanConverter");
+        }
+
+        @Override public RelNode convert(RelNode rel) {
+            IgniteTableScan scan = (IgniteTableScan) rel;
+
+            RelTraitSet traitSet = scan.getTraitSet().replace(INTERPRETABLE);
+            return new ScanRel(rel.getCluster(), traitSet, scan.getTable());
+        }
+    }
+
+    public static class JoinConverter extends ConverterRule {
+        public JoinConverter() {
+            super(IgniteJoin.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "JoinConverter");
+        }
+
+        @Override public RelNode convert(RelNode rel) {
+            IgniteJoin join = (IgniteJoin) rel;
+
+            RelNode left = convert(join.getLeft(), INTERPRETABLE);
+            RelNode right = convert(join.getRight(), INTERPRETABLE);
+
+            RelTraitSet traitSet = join.getTraitSet().replace(INTERPRETABLE);
+
+            return new JoinRel(rel.getCluster(), traitSet, left, right, join.getCondition(), join.getVariablesSet(), join.getJoinType());
+        }
+    }
+
+    public static class ProjectConverter extends ConverterRule {
+        public ProjectConverter() {
+            super(IgniteProject.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "ProjectConverter");
+        }
+
+        @Override public RelNode convert(RelNode rel) {
+            IgniteProject project = (IgniteProject) rel;
+            RelTraitSet traitSet = project.getTraitSet().replace(INTERPRETABLE);
+            RelNode input = convert(project.getInput(), INTERPRETABLE);
+
+            return new ProjectRel(rel.getCluster(), traitSet, input, project.getProjects(), project.getRowType());
+        }
+    }
+
+    public static class FilterConverter extends ConverterRule {
+        public FilterConverter() {
+            super(IgniteFilter.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "FilterConverter");
+        }
+
+        @Override public RelNode convert(RelNode rel) {
+            IgniteFilter filter = (IgniteFilter) rel;
+            RelTraitSet traitSet = filter.getTraitSet().replace(INTERPRETABLE);
+            RelNode input = convert(filter.getInput(), INTERPRETABLE);
+
+            return new FilterRel(rel.getCluster(), traitSet, input, filter.getCondition());
+        }
+    }
+
+    public static class SenderConverter extends ConverterRule {
+        public SenderConverter() {
+            super(IgniteSender.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "SenderConverter");
+        }
+
+        @Override public RelNode convert(RelNode rel) {
+            IgniteSender sender = (IgniteSender) rel;
+            RelTraitSet traitSet = sender.getTraitSet().replace(INTERPRETABLE);
+            RelNode input = convert(sender.getInput(), INTERPRETABLE);
+
+            return new SenderRel(rel.getCluster(), traitSet, input, sender.target());
+        }
+    }
+
+    public static class ReceiverConverter extends ConverterRule {
+        public ReceiverConverter() {
+            super(IgniteReceiver.class, IgniteRel.IGNITE_CONVENTION, INTERPRETABLE, "ReceiverConverter");
+        }
+
+        @Override public RelNode convert(RelNode rel) {
+            IgniteReceiver sender = (IgniteReceiver) rel;
+            RelTraitSet traitSet = sender.getTraitSet().replace(INTERPRETABLE);
+
+            return new ReceiverRel(rel.getCluster(), traitSet, sender.source());
+        }
+    }
+
+    public static class ReceiverRel extends AbstractRelNode implements InterRel {
+        private final org.apache.ignite.internal.processors.query.calcite.splitter.Source source;
+
+        protected ReceiverRel(RelOptCluster cluster, RelTraitSet traits, org.apache.ignite.internal.processors.query.calcite.splitter.Source source) {
+            super(cluster, traits);
+
+            this.source = source;
+        }
+
+        @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+            return new ReceiverRel(getCluster(), traitSet, source);
+        }
+
+        @Override public <T> Node<T> implement(Implementor<T> implementor) {
+            return implementor.implement(this);
+        }
+    }
+
+    public static class SenderRel extends SingleRel implements InterRel {
+        private final Target target;
+
+        protected SenderRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, @NotNull Target target) {
+            super(cluster, traits, input);
+            this.target = target;
+        }
+
+        @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+            return new SenderRel(getCluster(), traitSet, sole(inputs), target);
+        }
+
+        @Override public <T> Node<T> implement(Implementor<T> implementor) {
+            return implementor.implement(this);
+        }
+    }
+
+    public static class FilterRel extends Filter implements InterRel {
+        protected FilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
+            super(cluster, traits, child, condition);
+        }
+
+        @Override public Filter copy(RelTraitSet traitSet, RelNode input, RexNode condition) {
+            return new FilterRel(getCluster(), traitSet, input, condition);
+        }
+
+        @Override public <T> Node<T> implement(Implementor<T> implementor) {
+            return implementor.implement(this);
+        }
+    }
+
+    public static class ProjectRel extends Project implements InterRel {
+        protected ProjectRel(RelOptCluster cluster, RelTraitSet traits, RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
+            super(cluster, traits, input, projects, rowType);
+        }
+
+        @Override public Project copy(RelTraitSet traitSet, RelNode input, List<RexNode> projects, RelDataType rowType) {
+            return new ProjectRel(getCluster(), traitSet, input, projects, rowType);
+        }
+
+        @Override public <T> Node<T> implement(Implementor<T> implementor) {
+            return implementor.implement(this);
+        }
+    }
+
+    public static class JoinRel extends Join implements InterRel {
+        protected JoinRel(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) {
+            super(cluster, traitSet, left, right, condition, variablesSet, joinType);
+        }
+
+        @Override public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+            return new JoinRel(getCluster(), traitSet, left, right, condition, variablesSet, joinType);
+        }
+
+        @Override public <T> Node<T> implement(Implementor<T> implementor) {
+            return implementor.implement(this);
+        }
+    }
+
+    public static class ScanRel extends TableScan implements InterRel {
+        protected ScanRel(RelOptCluster cluster, RelTraitSet traitSet, RelOptTable table) {
+            super(cluster, traitSet, table);
+        }
+
+        @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+            return this;
+        }
+
+        @Override public <T> Node<T> implement(Implementor<T> implementor) {
+            return implementor.implement(this);
+        }
+    }
+
+    public static class Implementor<T> {
+        private final PlannerContext ctx;
+        private final DataContext root;
+        private final ScalarFactory factory;
+        private Deque<Sink<T>> stack;
+
+        public Implementor(DataContext root) {
+            this.root = root;
+
+            ctx = PLANNER_CONTEXT.get(root);
+            factory = new ScalarFactory(new RexBuilder(ctx.typeFactory()));
+            stack = new ArrayDeque<>();
+        }
+
+        public Node<T> implement(SenderRel rel) {
+            assert stack.isEmpty();
+
+            GridCacheVersion id = QUERY_ID.get(root);
+            long exchangeId = rel.target.exchangeId();
+            NodesMapping mapping = rel.target.mapping();
+            List<UUID> targets = mapping.nodes();
+            DistributionTrait distribution = rel.target.distribution();
+            DestinationFunctionFactory destFactory = distribution.destinationFunctionFactory();
+            DestinationFunction function = destFactory.create(ctx, mapping, distribution.keys());
+
+            Outbox<T> res = new Outbox<>(id, exchangeId, targets, function);
+
+            stack.push(res.sink());
+
+            res.source(source(rel.getInput()));
+
+            return res;
+        }
+
+        public Node<T> implement(FilterRel rel) {
+            assert !stack.isEmpty();
+
+            FilterNode res = new FilterNode((Sink<Object[]>) stack.pop(), factory.filterPredicate(root, rel.getCondition(), rel.getRowType()));
+
+            stack.push((Sink<T>) res.sink());
+
+            res.source(source(rel.getInput()));
+
+            return (Node<T>) res;
+        }
+
+        public Node<T> implement(ProjectRel rel) {
+            assert !stack.isEmpty();
+
+            ProjectNode res = new ProjectNode((Sink<Object[]>) stack.pop(), factory.projectExpression(root, rel.getProjects(), rel.getInput().getRowType()));
+
+            stack.push((Sink<T>) res.sink());
+
+            res.source(source(rel.getInput()));
+
+            return (Node<T>) res;
+        }
+
+        public Node<T> implement(JoinRel rel) {
+            assert !stack.isEmpty();
+
+            JoinNode res = new JoinNode((Sink<Object[]>) stack.pop(), factory.joinExpression(root, rel.getCondition(), rel.getLeft().getRowType(), rel.getRight().getRowType()));
+
+            stack.push((Sink<T>) res.sink(1));
+            stack.push((Sink<T>) res.sink(0));
+
+            res.sources(sources(rel.getInputs()));
+
+            return (Node<T>) res;
+        }
+
+        public Node<T> implement(ScanRel rel) {
+            assert !stack.isEmpty();
+
+            Iterable<Object[]> source = rel.getTable().unwrap(ScannableTable.class).scan(root);
+
+            return (Node<T>) new ScanNode((Sink<Object[]>)stack.pop(), source);
+        }
+
+        public Node<T> implement(ReceiverRel rel) {
+            throw new AssertionError(); // TODO
+        }
+
+        private Source source(RelNode rel) {
+            if (rel.getConvention() != INTERPRETABLE)
+                throw new IllegalStateException("INTERPRETABLE is required.");
+
+            return ((InterRel)rel).implement(this);
+        }
+
+        private List<Source> sources(List<RelNode> rels) {
+            ArrayList<Source> res = new ArrayList<>(rels.size());
+
+            for (RelNode rel : rels) {
+                res.add(source(rel));
+            }
+
+            return res;
+        }
+
+        public Node<T> go(RelNode rel) {
+            if (rel.getConvention() != INTERPRETABLE)
+                throw new IllegalStateException("INTERPRETABLE is required.");
+
+            if (rel instanceof SenderRel)
+                return implement((SenderRel)rel);
+
+            ConsumerNode res = new ConsumerNode();
+
+            stack.push((Sink<T>) res.sink());
+
+            res.source(source(rel));
+
+            return (Node<T>) res;
+        }
+    }
+
+    /**
+     *
+     */
+    private static class ScalarFactory {
+        private final JaninoRexCompiler rexCompiler;
+        private final RexBuilder builder;
+
+        private ScalarFactory(RexBuilder builder) {
+            rexCompiler = new JaninoRexCompiler(builder);
+            this.builder = builder;
+        }
+
+        public <T> Predicate<T> filterPredicate(DataContext root, RexNode filter, RelDataType rowType) {
+            System.out.println("filterPredicate for" + filter);
+
+            Scalar scalar = rexCompiler.compile(ImmutableList.of(filter), rowType);
+            Context ctx = Util.createContext(root);
+
+            return new FilterPredicate<>(ctx, scalar);
+        }
+
+        public <T> Function<T, T> projectExpression(DataContext root, List<RexNode> projects, RelDataType rowType) {
+            System.out.println("joinExpression for" + projects);
+
+            Scalar scalar = rexCompiler.compile(projects, rowType);
+            Context ctx = Util.createContext(root);
+            int count = projects.size();
+
+            return new ProjectExpression<>(ctx, scalar, count);
+        }
+
+        public <T> BiFunction<T, T, T> joinExpression(DataContext root, RexNode expression, RelDataType leftType, RelDataType rightType) {
+            System.out.println("joinExpression for" + expression);
+
+            RelDataType rowType = combinedType(leftType, rightType);
+
+            Scalar scalar = rexCompiler.compile(ImmutableList.of(expression), rowType);
+            Context ctx = Util.createContext(root);
+            ctx.values = new Object[rowType.getFieldCount()];
+
+            return new JoinExpression<>(ctx, scalar);
+        }
+
+        private RelDataType combinedType(RelDataType... types) {
+            RelDataTypeFactory.Builder typeBuilder = new RelDataTypeFactory.Builder(builder.getTypeFactory());
+
+            for (RelDataType type : types)
+                typeBuilder.addAll(type.getFieldList());
+
+            return typeBuilder.build();
+        }
+
+        private static class FilterPredicate<T> implements Predicate<T> {
+            private final Context ctx;
+            private final Scalar scalar;
+            private final Object[] vals;
+
+            private FilterPredicate(Context ctx, Scalar scalar) {
+                this.ctx = ctx;
+                this.scalar = scalar;
+
+                vals = new Object[1];
+            }
+
+            @Override public boolean test(T r) {
+                ctx.values = (Object[]) r;
+                scalar.execute(ctx, vals);
+                return (Boolean) vals[0];
+            }
+        }
+
+        private static class JoinExpression<T> implements BiFunction<T, T, T> {
+            private final Object[] vals;
+            private final Context ctx;
+            private final Scalar scalar;
+
+            private Object[] left0;
+
+            private JoinExpression(Context ctx, Scalar scalar) {
+                this.ctx = ctx;
+                this.scalar = scalar;
+
+                vals = new Object[1];
+            }
+
+            @Override public T apply(T left, T right) {
+                if (left0 != left) {
+                    left0 = (Object[]) left;
+                    System.arraycopy(left0, 0, ctx.values, 0, left0.length);
+                }
+
+                Object[] right0 = (Object[]) right;
+                System.arraycopy(right0, 0, ctx.values, left0.length, right0.length);
+
+                scalar.execute(ctx, vals);
+
+                if ((Boolean) vals[0])
+                    return (T) Arrays.copyOf(ctx.values, ctx.values.length);
+
+                return null;
+            }
+        }
+
+        private static class ProjectExpression<T> implements Function<T, T> {
+            private final Context ctx;
+            private final Scalar scalar;
+            private final int count;
+
+            private ProjectExpression(Context ctx, Scalar scalar, int count) {
+                this.ctx = ctx;
+                this.scalar = scalar;
+                this.count = count;
+            }
+
+            @Override public T apply(T r) {
+                ctx.values = (Object[]) r;
+                Object[] res = new Object[count];
+                scalar.execute(ctx, res);
+
+                return (T) res;
+            }
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java
index 71266ff..2472e8e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Node.java
@@ -21,7 +21,7 @@ import java.util.List;
 /**
  *
  */
-public interface Node<T> {
+public interface Node<T> extends Source {
     Sink<T> sink(int idx);
     void sources(List<Source> sources);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java
deleted file mode 100644
index 62847ac..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScalarFactory.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.exec;
-
-import com.google.common.collect.ImmutableList;
-import java.util.Arrays;
-import java.util.function.BiFunction;
-import java.util.function.Predicate;
-import org.apache.calcite.DataContext;
-import org.apache.calcite.interpreter.Context;
-import org.apache.calcite.interpreter.JaninoRexCompiler;
-import org.apache.calcite.interpreter.Scalar;
-import org.apache.calcite.interpreter.Util;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexNode;
-
-/**
- *
- */
-public class ScalarFactory {
-    private final JaninoRexCompiler rexCompiler;
-    private final RexBuilder builder;
-
-    public ScalarFactory(RexBuilder builder) {
-        rexCompiler = new JaninoRexCompiler(builder);
-        this.builder = builder;
-    }
-
-    public <T> Predicate<T> filterPredicate(DataContext root, RexNode filter, RelDataType rowType) {
-        Scalar scalar = rexCompiler.compile(ImmutableList.of(filter), rowType);
-        Context ctx = Util.createContext(root);
-
-        return new FilterPredicate<>(ctx, scalar);
-    }
-
-    public <T> BiFunction<T, T, T> joinExpression(DataContext root, RexNode expression, RelDataType leftType, RelDataType rightType) {
-        RelDataType rowType = combinedType(leftType, rightType);
-
-        Scalar scalar = rexCompiler.compile(ImmutableList.of(expression), rowType);
-        Context ctx = Util.createContext(root);
-        ctx.values = new Object[rowType.getFieldCount()];
-
-        return new JoinExpression<>(ctx, scalar);
-    }
-
-    private RelDataType combinedType(RelDataType... types) {
-        RelDataTypeFactory.Builder typeBuilder = new RelDataTypeFactory.Builder(builder.getTypeFactory());
-
-        for (RelDataType type : types)
-            typeBuilder.addAll(type.getFieldList());
-
-        return typeBuilder.build();
-    }
-
-    private static class FilterPredicate<T> implements Predicate<T> {
-        private final Context ctx;
-        private final Scalar scalar;
-        private final Object[] vals;
-
-        private FilterPredicate(Context ctx, Scalar scalar) {
-            this.ctx = ctx;
-            this.scalar = scalar;
-
-            vals = new Object[1];
-        }
-
-        @Override public boolean test(T r) {
-            ctx.values = (Object[]) r;
-            scalar.execute(ctx, vals);
-            return (Boolean) vals[0];
-        }
-    }
-
-    private static class JoinExpression<T> implements BiFunction<T, T, T> {
-        private final Object[] vals;
-        private final Context ctx;
-        private final Scalar scalar;
-
-        private Object[] left0;
-
-        private JoinExpression(Context ctx, Scalar scalar) {
-            this.ctx = ctx;
-            this.scalar = scalar;
-
-            vals = new Object[1];
-        }
-
-        @Override public T apply(T left, T right) {
-            if (left0 != left) {
-                left0 = (Object[]) left;
-                System.arraycopy(left0, 0, ctx.values, 0, left0.length);
-            }
-
-            Object[] right0 = (Object[]) right;
-            System.arraycopy(right0, 0, ctx.values, left0.length, right0.length);
-
-            scalar.execute(ctx, vals);
-
-            if ((Boolean) vals[0])
-                return (T) Arrays.copyOf(ctx.values, ctx.values.length);
-
-            return null;
-        }
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
index 19edc8f..37c128a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ScanNode.java
@@ -22,18 +22,19 @@ import java.util.List;
 /**
  *
  */
-public class ScanNode implements SingleNode<Object[]>, Source{
+public class ScanNode implements SingleNode<Object[]> {
     private static final Object[] END = new Object[0];
 
     /** */
     private final Sink<Object[]> target;
-    private final Iterator<Object[]> it;
+    private final Iterable<Object[]> source;
 
+    private Iterator<Object[]> it;
     private Object[] row;
 
-    protected ScanNode(Sink<Object[]> target, Iterator<Object[]> it) {
+    protected ScanNode(Sink<Object[]> target, Iterable<Object[]> source) {
         this.target = target;
-        this.it = it;
+        this.source = source;
     }
 
     @Override public void signal() {
@@ -45,6 +46,9 @@ public class ScanNode implements SingleNode<Object[]>, Source{
 
         row = null;
 
+        if (it == null)
+            it = source.iterator();
+
         while (it.hasNext()) {
             row = it.next();
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
index 3ace003..b282ea7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
@@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.metadata;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
 
 /**
@@ -26,9 +26,9 @@ import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
  */
 public class FragmentInfo {
     private final NodesMapping mapping;
-    private final ImmutableList<Pair<Receiver, Source>> sources;
+    private final ImmutableList<Pair<IgniteReceiver, Source>> sources;
 
-    public FragmentInfo(Pair<Receiver, Source> source) {
+    public FragmentInfo(Pair<IgniteReceiver, Source> source) {
         this(ImmutableList.of(source), null);
     }
 
@@ -36,7 +36,7 @@ public class FragmentInfo {
         this(null, mapping);
     }
 
-    public FragmentInfo(ImmutableList<Pair<Receiver, Source>> sources, NodesMapping mapping) {
+    public FragmentInfo(ImmutableList<Pair<IgniteReceiver, Source>> sources, NodesMapping mapping) {
         this.sources = sources;
         this.mapping = mapping;
     }
@@ -45,7 +45,7 @@ public class FragmentInfo {
         return mapping;
     }
 
-    public ImmutableList<Pair<Receiver, Source>> sources() {
+    public ImmutableList<Pair<IgniteReceiver, Source>> sources() {
         return sources;
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index 9f02f0c..503b424 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -29,8 +29,8 @@ import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.util.Edge;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
 
@@ -93,7 +93,7 @@ public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
         return new OptimisticPlanningException(msg, new Edge(rel, rel.getRight(), 1), cause);
     }
 
-    public FragmentInfo getFragmentInfo(Receiver rel, RelMetadataQuery mq) {
+    public FragmentInfo getFragmentInfo(IgniteReceiver rel, RelMetadataQuery mq) {
         return new FragmentInfo(Pair.of(rel, rel.source()));
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index 0a76b4d..c93d04b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -24,9 +24,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import org.jetbrains.annotations.NotNull;
 
@@ -44,8 +44,8 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
             IgniteJoin.class,
             IgniteProject.class,
             IgniteTableScan.class,
-            Receiver.class,
-            Sender.class
+            IgniteReceiver.class,
+            IgniteSender.class
         ));
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ContextValue.java
similarity index 52%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ContextValue.java
index e832da4..0fe3dc5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/AbstractNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/ContextValue.java
@@ -14,31 +14,31 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.query.calcite.exec;
+package org.apache.ignite.internal.processors.query.calcite.prepare;
 
-import java.util.Collections;
-import java.util.List;
+import org.apache.calcite.DataContext;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 
 /**
  *
  */
-public abstract class AbstractNode<T> implements Node<T>, Source {
-    protected final Sink<T> target;
-    protected List<Source> sources;
+public enum ContextValue {
+    QUERY_ID("_query_id", GridCacheVersion.class),
+    PLANNER_CONTEXT("_planner_context", PlannerContext.class);
 
-    protected AbstractNode(Sink<T> target) {
-        this.target = target;
-    }
+    private final String valueName;
+    private final Class type;
 
-    @Override public void sources(List<Source> sources) {
-        this.sources = Collections.unmodifiableList(sources);
+    ContextValue(String valueName, Class type) {
+        this.valueName = valueName;
+        this.type = type;
     }
 
-    public void signal(int idx) {
-        sources.get(idx).signal();
+    public String valueName() {
+        return valueName;
     }
 
-    @Override public void signal() {
-        sources.forEach(Source::signal);
+    public <T> T get(DataContext ctx) {
+        return (T) type.cast(ctx.get(valueName));
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java
index 06aaa024..3c2d14c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java
@@ -26,15 +26,9 @@ import org.apache.calcite.schema.SchemaPlus;
 /**
  *
  */
-class DataContextImpl implements DataContext {
+public class DataContextImpl implements DataContext {
     /** */
-    private final JavaTypeFactory typeFactory;
-
-    /** */
-    private final SchemaPlus schema;
-
-    /** */
-    private final QueryProvider queryProvider;
+    private final PlannerContext ctx;
 
     /** */
     private final Map<String, Object> params;
@@ -43,31 +37,31 @@ class DataContextImpl implements DataContext {
      * @param params Parameters.
      * @param ctx Query context.
      */
-    DataContextImpl(Map<String, Object> params, PlannerContext ctx) {
-        typeFactory = ctx.typeFactory();
-        schema = ctx.schema();
-        queryProvider = ctx.queryProvider();
-
+    public DataContextImpl(Map<String, Object> params, PlannerContext ctx) {
         this.params = params;
+        this.ctx = ctx;
     }
 
     /** {@inheritDoc} */
     @Override public SchemaPlus getRootSchema() {
-        return schema;
+        return ctx.schema();
     }
 
     /** {@inheritDoc} */
     @Override public JavaTypeFactory getTypeFactory() {
-        return typeFactory;
+        return ctx.typeFactory();
     }
 
     /** {@inheritDoc} */
     @Override public QueryProvider getQueryProvider() {
-        return queryProvider;
+        return ctx.queryProvider();
     }
 
     /** {@inheritDoc} */
     @Override public Object get(String name) {
+        if (ContextValue.PLANNER_CONTEXT.valueName().equals(name))
+            return ctx;
+
         return params.get(name);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
similarity index 90%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
index 0287236..98cf908 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
@@ -28,14 +28,14 @@ import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 /**
  *
  */
-public final class Receiver extends AbstractRelNode implements IgniteRel {
+public final class IgniteReceiver extends AbstractRelNode implements IgniteRel {
     private final Source source;
 
     /**
      * @param cluster Cluster this relational expression belongs to
      * @param traits Trait set.
      */
-    public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, Source source) {
+    public IgniteReceiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, Source source) {
         super(cluster, traits);
         this.rowType = rowType;
         this.source = source;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
similarity index 79%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
index fb1f277..e89712a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
@@ -16,6 +16,7 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
+import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
@@ -25,12 +26,11 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdDist
 import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
-import org.jetbrains.annotations.NotNull;
 
 /**
  *
  */
-public final class Sender extends SingleRel implements IgniteRel {
+public final class IgniteSender extends SingleRel implements IgniteRel {
     private Target target;
 
     /**
@@ -39,16 +39,20 @@ public final class Sender extends SingleRel implements IgniteRel {
      * @param traits Trait set.
      * @param input Input relational expression
      */
-    public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+    public IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
         super(cluster, traits, input);
     }
 
-    private Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input, @NotNull Target target) {
+    private IgniteSender(RelOptCluster cluster, RelTraitSet traits, RelNode input, Target target) {
         super(cluster, traits, input);
 
         this.target = target;
     }
 
+    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+        return new IgniteSender(getCluster(), traitSet, sole(inputs), target);
+    }
+
     /** {@inheritDoc} */
     @Override public <T> T implement(RelImplementor<T> implementor) {
         return implementor.implement(this);
@@ -62,7 +66,7 @@ public final class Sender extends SingleRel implements IgniteRel {
         return target;
     }
 
-    public static Sender create(RelNode input, Target target) {
+    public static IgniteSender create(RelNode input, Target target) {
         RelOptCluster cluster = input.getCluster();
         RelMetadataQuery mq = cluster.getMetadataQuery();
 
@@ -70,6 +74,6 @@ public final class Sender extends SingleRel implements IgniteRel {
             .replace(IgniteRel.IGNITE_CONVENTION)
             .replaceIf(DistributionTraitDef.INSTANCE, () -> IgniteMdDistribution.distribution(input, mq));
 
-        return new Sender(cluster, traits, input, target);
+        return new IgniteSender(cluster, traits, input, target);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
index 24938a1..a243f52 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.rule;
 
 import org.apache.calcite.tools.RuleSet;
 import org.apache.calcite.tools.RuleSets;
+import org.apache.ignite.internal.processors.query.calcite.exec.Interpretable;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 
 /**
@@ -37,6 +38,13 @@ public enum PlannerPhase {
         @Override public RuleSet getRules(PlannerContext ctx) {
             return RuleSets.ofList(IgniteRules.logicalRules(ctx));
         }
+    },
+
+    /** */
+    PHYSICAL("Execution tree building") {
+        @Override public RuleSet getRules(PlannerContext ctx) {
+            return RuleSets.ofList(Interpretable.RULES);
+        }
     };
 
     public final String description;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 3007a85..14b8191 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.schema.ScannableTable;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTable;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
@@ -36,7 +39,7 @@ import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
 /** */
-public class IgniteTable extends AbstractTable implements TranslatableTable {
+public class IgniteTable extends AbstractTable implements TranslatableTable, ScannableTable {
     private final String tableName;
     private final String cacheName;
     private final RowType rowType;
@@ -82,4 +85,8 @@ public class IgniteTable extends AbstractTable implements TranslatableTable {
     public FragmentInfo fragmentInfo(PlannerContext ctx) {
         return new FragmentInfo(ctx.mapForCache(CU.cacheId(cacheName), ctx.topologyVersion()));
     }
+
+    @Override public Enumerable<Object[]> scan(DataContext root) {
+        throw new AssertionError(); // TODO
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
index d1a00cd..24534ec 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/ReceiverNode.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 import java.util.List;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.serialize.type.DataType;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Source;
 import org.apache.ignite.internal.processors.query.calcite.splitter.SourceImpl;
@@ -38,14 +38,14 @@ public class ReceiverNode extends RelGraphNode {
         this.source = source;
     }
 
-    public static ReceiverNode create(Receiver rel) {
+    public static ReceiverNode create(IgniteReceiver rel) {
         Source source = new SourceImpl(rel.source().exchangeId(), rel.source().mapping());
 
         return new ReceiverNode(rel.getTraitSet(), DataType.fromType(rel.getRowType()), source);
     }
 
     @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
-        return new Receiver(ctx.getCluster(),
+        return new IgniteReceiver(ctx.getCluster(),
             traitSet.toTraitSet(ctx.getCluster()),
             dataType.toRelDataType(ctx.getTypeFactory()),
             source);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
index c3fb982..6e534d2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
@@ -24,10 +24,10 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import org.apache.ignite.internal.processors.query.calcite.serialize.expression.RexToExpTranslator;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
@@ -69,11 +69,11 @@ public class RelToGraphConverter {
             return new Item(graph.addNode(curParent, TableScanNode.create(rel)), Commons.cast(rel.getInputs()));
         }
 
-        @Override public Item implement(Receiver rel) {
+        @Override public Item implement(IgniteReceiver rel) {
             return new Item(graph.addNode(curParent, ReceiverNode.create(rel)), Collections.emptyList());
         }
 
-        @Override public Item implement(Sender rel) {
+        @Override public Item implement(IgniteSender rel) {
             return new Item(graph.addNode(curParent, SenderNode.create(rel)), Commons.cast(rel.getInputs()));
         }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
index 94671dd..5164eed 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/SenderNode.java
@@ -18,7 +18,7 @@ package org.apache.ignite.internal.processors.query.calcite.serialize.relation;
 
 import java.util.List;
 import org.apache.calcite.rel.RelNode;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Target;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -32,11 +32,11 @@ public class SenderNode extends RelGraphNode {
         this.target = target;
     }
 
-    public static SenderNode create(Sender rel) {
+    public static SenderNode create(IgniteSender rel) {
         return new SenderNode(rel.target());
     }
 
     @Override public RelNode toRel(ConversionContext ctx, List<RelNode> children) {
-        return Sender.create(F.first(children), target);
+        return IgniteSender.create(F.first(children), target);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index 45c4591..e523f50 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -25,8 +25,8 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentInfo;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -55,11 +55,11 @@ public class Fragment implements Source {
         else
             mapping = info.mapping().deduplicate();
 
-        ImmutableList<Pair<Receiver, Source>> sources = info.sources();
+        ImmutableList<Pair<IgniteReceiver, Source>> sources = info.sources();
 
         if (!F.isEmpty(sources)) {
-            for (Pair<Receiver, Source> input : sources) {
-                Receiver receiver = input.left;
+            for (Pair<IgniteReceiver, Source> input : sources) {
+                IgniteReceiver receiver = input.left;
                 Source source = input.right;
 
                 source.init(mapping, receiver.distribution(), ctx, mq);
@@ -74,7 +74,7 @@ public class Fragment implements Source {
     @Override public void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) {
         assert remote();
 
-        ((Sender) root).init(new TargetImpl(exchangeId, mapping, distribution));
+        ((IgniteSender) root).init(new TargetImpl(exchangeId, mapping, distribution));
 
         init(ctx, mq);
     }
@@ -88,6 +88,6 @@ public class Fragment implements Source {
     }
 
     private boolean remote() {
-        return root instanceof Sender;
+        return root instanceof IgniteSender;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index b0369bd..fd0c75f 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -24,8 +24,8 @@ import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.util.Edge;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -62,11 +62,11 @@ public class QueryPlan {
                 RelOptCluster cluster = child.getCluster();
                 RelTraitSet traitSet = child.getTraitSet();
 
-                Sender sender = new Sender(cluster, traitSet, child);
+                IgniteSender sender = new IgniteSender(cluster, traitSet, child);
                 Fragment fragment = new Fragment(sender);
                 fragments.add(fragment);
 
-                parent.replaceInput(edge.childIdx(), new Receiver(cluster, traitSet, sender.getRowType(), fragment));
+                parent.replaceInput(edge.childIdx(), new IgniteReceiver(cluster, traitSet, sender.getRowType(), fragment));
             }
         }
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index c15c46a..53ae9df 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -23,9 +23,9 @@ import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteRelShuttle;
 
 /**
@@ -50,18 +50,18 @@ public class Splitter extends IgniteRelShuttle {
         RelTraitSet inputTraits = input.getTraitSet();
         RelTraitSet outputTraits = rel.getTraitSet();
 
-        Sender sender = new Sender(cluster, inputTraits, visit(input));
+        IgniteSender sender = new IgniteSender(cluster, inputTraits, visit(input));
         Fragment fragment = new Fragment(sender);
         fragments.add(fragment);
 
-        return new Receiver(cluster, outputTraits, sender.getRowType(), fragment);
+        return new IgniteReceiver(cluster, outputTraits, sender.getRowType(), fragment);
     }
 
-    @Override public RelNode visit(Receiver rel) {
+    @Override public RelNode visit(IgniteReceiver rel) {
         throw new AssertionError("An attempt to split an already split task.");
     }
 
-    @Override public RelNode visit(Sender rel) {
+    @Override public RelNode visit(IgniteSender rel) {
         throw new AssertionError("An attempt to split an already split task.");
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
index eb1129a..9734885 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
@@ -22,9 +22,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 
 /**
  *
@@ -42,11 +42,11 @@ public class IgniteRelShuttle extends RelShuttleImpl {
         return visitChild(rel, 0, rel.getInput());
     }
 
-    public RelNode visit(Receiver rel) {
+    public RelNode visit(IgniteReceiver rel) {
         return rel;
     }
 
-    public RelNode visit(Sender rel) {
+    public RelNode visit(IgniteSender rel) {
         return visitChild(rel, 0, rel.getInput());
     }
 
@@ -65,10 +65,10 @@ public class IgniteRelShuttle extends RelShuttleImpl {
             return visit((IgniteFilter)rel);
         if (rel instanceof IgniteProject)
             return visit((IgniteProject)rel);
-        if (rel instanceof Receiver)
-            return visit((Receiver)rel);
-        if (rel instanceof Sender)
-            return visit((Sender)rel);
+        if (rel instanceof IgniteReceiver)
+            return visit((IgniteReceiver)rel);
+        if (rel instanceof IgniteSender)
+            return visit((IgniteSender)rel);
         if (rel instanceof IgniteTableScan)
             return visit((IgniteTableScan)rel);
         if (rel instanceof IgniteJoin)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java
index 58ba064..bcbe604 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/RelImplementor.java
@@ -20,10 +20,10 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 
 /**
  *
@@ -34,7 +34,7 @@ public interface RelImplementor<T> {
     T implement(IgniteJoin rel);
     T implement(IgniteProject rel);
     T implement(IgniteTableScan rel);
-    T implement(Receiver rel);
-    T implement(Sender rel);
+    T implement(IgniteReceiver rel);
+    T implement(IgniteSender rel);
     T implement(IgniteRel other);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScanIterator.java
similarity index 95%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScanIterator.java
index c8d8a16..f603fcc 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ScanIterator.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/TableScanIterator.java
@@ -34,7 +34,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFIN
 /**
  *
  */
-public class ScanIterator<T> extends GridCloseableIteratorAdapter<T> {
+public class TableScanIterator<T> extends GridCloseableIteratorAdapter<T> {
     private final int cacheId;
     private final Iterator<GridDhtLocalPartition> parts;
     private final Function<CacheDataRow, T> typeWrapper;
@@ -54,7 +54,7 @@ public class ScanIterator<T> extends GridCloseableIteratorAdapter<T> {
      */
     private T next;
 
-    public ScanIterator(int cacheId, Iterator<GridDhtLocalPartition> parts, Function<CacheDataRow, T> typeWrapper,
+    public TableScanIterator(int cacheId, Iterator<GridDhtLocalPartition> parts, Function<CacheDataRow, T> typeWrapper,
         Predicate<CacheDataRow> typeFilter) {
         this.cacheId = cacheId;
         this.parts = parts;
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 644633b..a5159d3 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -21,7 +21,11 @@ package org.apache.ignite.internal.processors.query.calcite;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import org.apache.calcite.DataContext;
+import org.apache.calcite.linq4j.Enumerable;
+import org.apache.calcite.linq4j.Linq4j;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
@@ -34,9 +38,15 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.calcite.exec.ConsumerNode;
+import org.apache.ignite.internal.processors.query.calcite.exec.Interpretable;
+import org.apache.ignite.internal.processors.query.calcite.exec.Node;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
+import org.apache.ignite.internal.processors.query.calcite.prepare.ContextValue;
+import org.apache.ignite.internal.processors.query.calcite.prepare.DataContextImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
@@ -56,17 +66,22 @@ import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTra
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.ignite.internal.processors.query.calcite.exec.Interpretable.INTERPRETABLE;
+
 /**
  *
  */
-//@WithSystemProperty(key = "calcite.debug", value = "true")
+@WithSystemProperty(key = "calcite.debug", value = "true")
 public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     private static GridTestKernalContext kernalContext;
@@ -89,30 +104,55 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
                 .field("name", String.class)
                 .field("projectId", Integer.class)
                 .field("cityId", Integer.class)
-                .build()));
+                .build()){
+            @Override public Enumerable<Object[]> scan(DataContext root) {
+                return Linq4j.asEnumerable(Arrays.asList(
+                    new Object[]{0, null, 0, "Igor", 0, 1},
+                    new Object[]{1, null, 1, "Roman", 0, 0}
+                ));
+            }
+        });
 
         publicSchema.addTable(new IgniteTable("Project", "Project",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
                 .field("ver", Integer.class)
-                .build()));
-
-
+                .build()){
+            @Override public Enumerable<Object[]> scan(DataContext root) {
+                return Linq4j.asEnumerable(Arrays.asList(
+                    new Object[]{0, null, 0, "Calcite", 1},
+                    new Object[]{1, null, 1, "Ignite", 1}
+                ));
+            }
+        });
 
         publicSchema.addTable(new IgniteTable("Country", "Country",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
                 .field("countryCode", Integer.class)
-                .build()));
+                .build()){
+            @Override public Enumerable<Object[]> scan(DataContext root) {
+                return Linq4j.asEnumerable(Arrays.<Object[]>asList(
+                    new Object[]{0, null, 0, "Russia", 7}
+                ));
+            }
+        });
 
         publicSchema.addTable(new IgniteTable("City", "City",
             RowType.builder()
                 .keyField("id", Integer.class, true)
                 .field("name", String.class)
                 .field("countryId", Integer.class)
-                .build()));
+                .build()){
+            @Override public Enumerable<Object[]> scan(DataContext root) {
+                return Linq4j.asEnumerable(Arrays.asList(
+                    new Object[]{0, null, 0, "Moscow", 0},
+                    new Object[]{1, null, 1, "Saint Petersburg", 0}
+                ));
+            }
+        });
 
         schema = Frameworks
             .createRootSchema(false)
@@ -487,6 +527,96 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     }
 
     @Test
+    public void testPhysicalPlan() throws Exception {
+        String sql = "SELECT d.id, d.name, d.projectId, p.name0, p.ver0 " +
+            "FROM PUBLIC.Developer d JOIN (" +
+            "SELECT pp.id as id0, pp.name as name0, pp.ver as ver0 FROM PUBLIC.Project pp" +
+            ") p " +
+            "ON d.projectId = p.id0 " +
+            "WHERE (d.projectId + 1) > ?";
+
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{-10}, this::context);
+
+        assertNotNull(ctx);
+
+        RelTraitDef[] traitDefs = {
+            DistributionTraitDef.INSTANCE,
+            ConventionTraitDef.INSTANCE
+        };
+
+        try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
+            assertNotNull(planner);
+
+            Query query = Commons.plannerContext(ctx).query();
+
+            assertNotNull(planner);
+
+            // Parse
+            SqlNode sqlNode = planner.parse(query.sql());
+
+            // Validate
+            sqlNode = planner.validate(sqlNode);
+
+            // Convert to Relational operators graph
+            RelRoot relRoot = planner.rel(sqlNode);
+
+            RelNode rel = relRoot.rel;
+
+            // Transformation chain
+            rel = planner.transform(PlannerType.HEP, PlannerPhase.SUBQUERY_REWRITE, rel, rel.getTraitSet());
+
+            RelTraitSet desired = rel.getCluster().traitSet()
+                .replace(IgniteRel.IGNITE_CONVENTION)
+                .replace(IgniteDistributions.single())
+                .simplify();
+
+            rel = planner.transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, rel, desired);
+
+            assertNotNull(relRoot);
+
+            QueryPlan plan = new Splitter().go((IgniteRel) rel);
+
+            assertNotNull(plan);
+
+            plan.init(ctx);
+
+            assertNotNull(plan);
+
+            assertTrue(plan.fragments().size() == 2);
+
+            desired = rel.getCluster().traitSetOf(INTERPRETABLE);
+
+            RelNode phys = planner.transform(PlannerType.VOLCANO, PlannerPhase.PHYSICAL, plan.fragments().get(1).root(), desired);
+
+            assertNotNull(phys);
+
+            Map<String, Object> params = ctx.query().params(F.asMap(ContextValue.QUERY_ID.valueName(), new GridCacheVersion()));
+
+            Interpretable.Implementor<Object[]> implementor = new Interpretable.Implementor<>(new DataContextImpl(params, ctx));
+
+            Node<Object[]> exec = implementor.go(phys.getInput(0));
+
+            assertNotNull(exec);
+
+            assertTrue(exec instanceof ConsumerNode);
+
+            ConsumerNode consumer = (ConsumerNode) exec;
+
+            assertTrue(consumer.hasNext());
+
+            ArrayList<Object[]> res = new ArrayList<>();
+
+            while (consumer.hasNext())
+                res.add(consumer.next());
+
+            assertFalse(res.isEmpty());
+
+            Assert.assertArrayEquals(new Object[]{0, "Igor", 0, "Calcite", 1}, res.get(0));
+            Assert.assertArrayEquals(new Object[]{1, "Roman", 0, "Calcite", 1}, res.get(1));
+        }
+    }
+
+    @Test
     public void testSplitterCollocatedReplicatedReplicated() throws Exception {
         String sql = "SELECT d.id, (d.id + 1) as id2, d.name, d.projectId, p.id0, p.ver0 " +
             "FROM PUBLIC.Developer d JOIN (" +
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java
index 744280a..dacf823 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionTest.java
@@ -50,14 +50,14 @@ public class ExecutionTest extends GridCommonAbstractTest {
             new Object[]{1, "Roman", "Kondakov"},
             new Object[]{2, "Ivan", "Pavlukhin"},
             new Object[]{3, "Alexey", "Goncharuk"}
-        ).iterator());
+        ));
 
         ScanNode projects = new ScanNode(join.sink(1), Arrays.asList(
             new Object[]{0, 2, "Calcite"},
             new Object[]{1, 1, "SQL"},
             new Object[]{2, 2, "Ignite"},
             new Object[]{3, 0, "Core"}
-        ).iterator());
+        ));
 
         join.sources(Arrays.asList(persons, projects));
 


Mime
View raw message