ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvvinbl...@apache.org
Subject [ignite] branch ignite-12248 updated: Turn implementor to visitor
Date Thu, 12 Dec 2019 13:38:33 GMT
This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 255e289  Turn implementor to visitor
255e289 is described below

commit 255e289a4e1802de162afb0eb90827d38691af73
Author: Igor Seliverstov <gvvinblade@gmail.com>
AuthorDate: Thu Dec 12 16:38:21 2019 +0300

    Turn implementor to visitor
---
 .../query/calcite/cluster/MappingServiceImpl.java  |  6 +-
 .../query/calcite/exchange/ExchangeProcessor.java  |  7 +-
 .../{ImplementorImpl.java => Implementor.java}     | 26 +++----
 .../query/calcite/rel/IgniteExchange.java          |  4 +-
 .../processors/query/calcite/rel/IgniteFilter.java |  4 +-
 .../processors/query/calcite/rel/IgniteJoin.java   |  4 +-
 .../query/calcite/rel/IgniteProject.java           |  4 +-
 .../query/calcite/rel/IgniteReceiver.java          |  4 +-
 .../processors/query/calcite/rel/IgniteRel.java    |  2 +-
 .../query/calcite/rel/IgniteRelShuttle.java        | 76 --------------------
 .../{Implementor.java => IgniteRelVisitor.java}    | 22 +++---
 .../processors/query/calcite/rel/IgniteSender.java |  4 +-
 .../query/calcite/rel/IgniteTableScan.java         |  4 +-
 .../serialize/relation/RelToGraphConverter.java    | 27 +++----
 .../query/calcite/splitter/Splitter.java           | 83 +++++++++++++++++-----
 .../processors/query/calcite/util/Commons.java     |  9 +++
 .../query/calcite/CalciteQueryProcessorTest.java   | 55 +++++++-------
 .../query/calcite/exchange/OutboxTest.java         | 18 ++++-
 18 files changed, 178 insertions(+), 181 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
index 66c7a74..3e91219 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
@@ -55,12 +55,12 @@ public class MappingServiceImpl implements MappingService {
     }
 
     @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer)
{
-        GridCacheContext cctx = ctx.cache().context().cacheContext(cacheId);
+        GridCacheContext<?,?> cctx = ctx.cache().context().cacheContext(cacheId);
 
         return cctx.isReplicated() ? replicatedLocation(cctx, topVer) : partitionedLocation(cctx,
topVer);
     }
 
-    private NodesMapping partitionedLocation(GridCacheContext cctx, AffinityTopologyVersion
topVer) {
+    private NodesMapping partitionedLocation(GridCacheContext<?,?> cctx, AffinityTopologyVersion
topVer) {
         byte flags = NodesMapping.HAS_PARTITIONED_CACHES;
 
         List<List<ClusterNode>> assignments = cctx.affinity().assignments(topVer);
@@ -95,7 +95,7 @@ public class MappingServiceImpl implements MappingService {
         return new NodesMapping(null, res, flags);
     }
 
-    private NodesMapping replicatedLocation(GridCacheContext cctx, AffinityTopologyVersion
topVer) {
+    private NodesMapping replicatedLocation(GridCacheContext<?,?> cctx, AffinityTopologyVersion
topVer) {
         byte flags = NodesMapping.HAS_REPLICATED_CACHES;
 
         if (cctx.config().getNodeFilter() != null)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
index 692b4fa..6a6810a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
@@ -24,7 +24,10 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
  *
  */
 public interface ExchangeProcessor {
-    void register(Outbox outbox);
-    void unregister(Outbox outbox);
+    <T> Outbox<T> register(Outbox<T> outbox);
+    <T> void unregister(Outbox<T> outbox);
+    Inbox register(Inbox inbox);
+    void unregister(Inbox inbox);
     void send(GridCacheVersion queryId, long exchangeId, UUID nodeId, int batchId, List<?>
rows);
+    void acknowledge(GridCacheVersion queryId, long exchangeId, UUID nodeId, int batchId);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
similarity index 87%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
index 2a65887..cc8ed64 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ImplementorImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/Implementor.java
@@ -37,9 +37,9 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.Implementor;
 import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
 import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
 import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunctionFactory;
@@ -51,13 +51,13 @@ import static org.apache.ignite.internal.processors.query.calcite.prepare.Contex
 /**
  *
  */
-public class ImplementorImpl implements Implementor<Node<Object[]>>, RelOp<IgniteRel,
Node<Object[]>> {
+public class Implementor implements IgniteRelVisitor<Node<Object[]>>, RelOp<IgniteRel,
Node<Object[]>> {
     private final PlannerContext ctx;
     private final DataContext root;
     private final ScalarFactory factory;
     private Deque<Sink<Object[]>> stack;
 
-    public ImplementorImpl(DataContext root) {
+    public Implementor(DataContext root) {
         this.root = root;
 
         ctx = PLANNER_CONTEXT.get(root);
@@ -65,7 +65,7 @@ public class ImplementorImpl implements Implementor<Node<Object[]>>,
RelOp<Ignit
         stack = new ArrayDeque<>();
     }
 
-    @Override public Node<Object[]> implement(IgniteSender rel) {
+    @Override public Node<Object[]> visit(IgniteSender rel) {
         assert stack.isEmpty();
 
         GridCacheVersion id = QUERY_ID.get(root);
@@ -85,7 +85,7 @@ public class ImplementorImpl implements Implementor<Node<Object[]>>,
RelOp<Ignit
         return res;
     }
 
-    @Override public Node<Object[]> implement(IgniteFilter rel) {
+    @Override public Node<Object[]> visit(IgniteFilter rel) {
         assert !stack.isEmpty();
 
         FilterNode res = new FilterNode(stack.pop(), factory.filterPredicate(root, rel.getCondition(),
rel.getRowType()));
@@ -97,7 +97,7 @@ public class ImplementorImpl implements Implementor<Node<Object[]>>,
RelOp<Ignit
         return res;
     }
 
-    @Override public Node<Object[]> implement(IgniteProject rel) {
+    @Override public Node<Object[]> visit(IgniteProject rel) {
         assert !stack.isEmpty();
 
         ProjectNode res = new ProjectNode(stack.pop(), factory.projectExpression(root, rel.getProjects(),
rel.getInput().getRowType()));
@@ -109,7 +109,7 @@ public class ImplementorImpl implements Implementor<Node<Object[]>>,
RelOp<Ignit
         return res;
     }
 
-    @Override public Node<Object[]> implement(IgniteJoin rel) {
+    @Override public Node<Object[]> visit(IgniteJoin rel) {
         assert !stack.isEmpty();
 
         JoinNode res = new JoinNode(stack.pop(), factory.joinExpression(root, rel.getCondition(),
rel.getLeft().getRowType(), rel.getRight().getRowType()));
@@ -122,7 +122,7 @@ public class ImplementorImpl implements Implementor<Node<Object[]>>,
RelOp<Ignit
         return res;
     }
 
-    @Override public Node<Object[]> implement(IgniteTableScan rel) {
+    @Override public Node<Object[]> visit(IgniteTableScan rel) {
         assert !stack.isEmpty();
 
         Iterable<Object[]> source = rel.getTable().unwrap(ScannableTable.class).scan(root);
@@ -130,15 +130,15 @@ public class ImplementorImpl implements Implementor<Node<Object[]>>,
RelOp<Ignit
         return new ScanNode(stack.pop(), source);
     }
 
-    @Override public Node<Object[]> implement(IgniteReceiver rel) {
+    @Override public Node<Object[]> visit(IgniteReceiver rel) {
         throw new AssertionError(); // TODO
     }
 
-    @Override public Node<Object[]> implement(IgniteExchange rel) {
+    @Override public Node<Object[]> visit(IgniteExchange rel) {
         throw new AssertionError();
     }
 
-    @Override public Node<Object[]> implement(IgniteRel other) {
+    @Override public Node<Object[]> visit(IgniteRel other) {
         throw new AssertionError();
     }
 
@@ -146,7 +146,7 @@ public class ImplementorImpl implements Implementor<Node<Object[]>>,
RelOp<Ignit
         if (rel.getConvention() != IgniteConvention.INSTANCE)
             throw new IllegalStateException("INTERPRETABLE is required.");
 
-        return ((IgniteRel) rel).implement(this);
+        return ((IgniteRel) rel).accept(this);
     }
 
     private List<Source> sources(List<RelNode> rels) {
@@ -161,7 +161,7 @@ public class ImplementorImpl implements Implementor<Node<Object[]>>,
RelOp<Ignit
 
     @Override public Node<Object[]> go(IgniteRel rel) {
         if (rel instanceof IgniteSender)
-            return implement((IgniteSender) rel);
+            return visit((IgniteSender) rel);
 
         ConsumerNode res = new ConsumerNode();
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
index f932020..9d01fbb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteExchange.java
@@ -34,7 +34,7 @@ public class IgniteExchange extends Exchange implements IgniteRel {
         return new IgniteExchange(getCluster(), traitSet, newInput, newDistribution);
     }
 
-    @Override public <T> T implement(Implementor<T> implementor) {
-        return implementor.implement(this);
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
index 2a0594e..c408769 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteFilter.java
@@ -34,7 +34,7 @@ public class IgniteFilter extends Filter implements IgniteRel {
         return new IgniteFilter(getCluster(), traitSet, input, condition);
     }
 
-    @Override public <T> T implement(Implementor<T> implementor) {
-        return implementor.implement(this);
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
index 344f4a2..b1ad44b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteJoin.java
@@ -37,7 +37,7 @@ public class IgniteJoin extends Join implements IgniteRel {
         return new IgniteJoin(getCluster(), traitSet, left, right, condition, variablesSet,
joinType);
     }
 
-    @Override public <T> T implement(Implementor<T> implementor) {
-        return implementor.implement(this);
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
index 03fe6ca..42d2af2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteProject.java
@@ -36,7 +36,7 @@ public class IgniteProject extends Project implements IgniteRel {
         return new IgniteProject(getCluster(), traitSet, input, projects, rowType);
     }
 
-    @Override public <T> T implement(Implementor<T> implementor) {
-        return implementor.implement(this);
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
index 60d5477..545c6e8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteReceiver.java
@@ -43,8 +43,8 @@ public class IgniteReceiver extends AbstractRelNode implements IgniteRel
{
         return new IgniteReceiver(getCluster(), traitSet, rowType, source);
     }
 
-    @Override public <T> T implement(Implementor<T> implementor) {
-        return implementor.implement(this);
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
     }
 
     public RelSource source() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
index b5d876f..8e1567a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRel.java
@@ -22,5 +22,5 @@ import org.apache.calcite.rel.RelNode;
  *
  */
 public interface IgniteRel extends RelNode {
-    <T> T implement(Implementor<T> implementor);
+    <T> T accept(IgniteRelVisitor<T> visitor);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
deleted file mode 100644
index 7d3efc9..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelShuttle.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.rel;
-
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelShuttleImpl;
-
-/**
- *
- */
-public class IgniteRelShuttle extends RelShuttleImpl {
-    public RelNode visit(IgniteExchange rel) {
-        return visitChild(rel, 0, rel.getInput());
-    }
-
-    public RelNode visit(IgniteFilter rel) {
-        return visitChild(rel, 0, rel.getInput());
-    }
-
-    public RelNode visit(IgniteProject rel) {
-        return visitChild(rel, 0, rel.getInput());
-    }
-
-    public RelNode visit(IgniteReceiver rel) {
-        return rel;
-    }
-
-    public RelNode visit(IgniteSender rel) {
-        return visitChild(rel, 0, rel.getInput());
-    }
-
-    public RelNode visit(IgniteTableScan rel) {
-        return rel;
-    }
-
-    public RelNode visit(IgniteJoin rel) {
-        return visitChildren(rel);
-    }
-
-    @Override public RelNode visit(RelNode rel) {
-        if (rel instanceof IgniteExchange)
-            return visit((IgniteExchange)rel);
-        if (rel instanceof IgniteFilter)
-            return visit((IgniteFilter)rel);
-        if (rel instanceof IgniteProject)
-            return visit((IgniteProject)rel);
-        if (rel instanceof IgniteReceiver)
-            return visit((IgniteReceiver)rel);
-        if (rel instanceof IgniteSender)
-            return visit((IgniteSender)rel);
-        if (rel instanceof IgniteTableScan)
-            return visit((IgniteTableScan)rel);
-        if (rel instanceof IgniteJoin)
-            return visit((IgniteJoin)rel);
-
-        return visitOther(rel);
-    }
-
-    protected RelNode visitOther(RelNode rel) {
-        return super.visit(rel);
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
similarity index 63%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
index 50da38c..6c6ffa1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Implementor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteRelVisitor.java
@@ -19,24 +19,20 @@ package org.apache.ignite.internal.processors.query.calcite.rel;
 /**
  *
  */
-public interface Implementor<T> extends RelOp<IgniteRel, T> {
-    T implement(IgniteSender rel);
+public interface IgniteRelVisitor<T> {
+    T visit(IgniteSender rel);
 
-    T implement(IgniteFilter rel);
+    T visit(IgniteFilter rel);
 
-    T implement(IgniteProject rel);
+    T visit(IgniteProject rel);
 
-    T implement(IgniteJoin rel);
+    T visit(IgniteJoin rel);
 
-    T implement(IgniteTableScan rel);
+    T visit(IgniteTableScan rel);
 
-    T implement(IgniteReceiver rel);
+    T visit(IgniteReceiver rel);
 
-    T implement(IgniteExchange rel);
+    T visit(IgniteExchange rel);
 
-    T implement(IgniteRel other);
-
-    @Override default T go(IgniteRel rel) {
-        return rel.implement(this);
-    }
+    T visit(IgniteRel other);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
index e50b7bf..4763c38 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteSender.java
@@ -43,8 +43,8 @@ public class IgniteSender extends SingleRel implements IgniteRel {
         return new IgniteSender(getCluster(), traitSet, sole(inputs), target);
     }
 
-    @Override public <T> T implement(Implementor<T> implementor) {
-        return implementor.implement(this);
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
     }
 
     public RelTarget target() {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 5f01899..59af858 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -35,7 +35,7 @@ public class IgniteTableScan extends TableScan implements IgniteRel {
         return this;
     }
 
-    @Override public <T> T implement(Implementor<T> implementor) {
-        return implementor.implement(this);
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
index d935255..56d77aa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/relation/RelToGraphConverter.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
@@ -52,44 +53,44 @@ public class RelToGraphConverter implements RelOp<IgniteRel, RelGraph>
{
         }
     }
 
-    private final class Implementor implements org.apache.ignite.internal.processors.query.calcite.rel.Implementor<Item>
{
-        @Override public Item implement(IgniteFilter rel) {
+    private final class ItemTranslator implements IgniteRelVisitor<Item> {
+        @Override public Item visit(IgniteFilter rel) {
             return new Item(graph.addNode(curParent, FilterNode.create(rel, rexTranslator)),
Commons.cast(rel.getInputs()));
         }
 
-        @Override public Item implement(IgniteJoin rel) {
+        @Override public Item visit(IgniteJoin rel) {
             return new Item(graph.addNode(curParent, JoinNode.create(rel, rexTranslator)),
Commons.cast(rel.getInputs()));
         }
 
-        @Override public Item implement(IgniteProject rel) {
+        @Override public Item visit(IgniteProject rel) {
             return new Item(graph.addNode(curParent, ProjectNode.create(rel, rexTranslator)),
Commons.cast(rel.getInputs()));
         }
 
-        @Override public Item implement(IgniteTableScan rel) {
+        @Override public Item visit(IgniteTableScan rel) {
             return new Item(graph.addNode(curParent, TableScanNode.create(rel)), Commons.cast(rel.getInputs()));
         }
 
-        @Override public Item implement(IgniteReceiver rel) {
+        @Override public Item visit(IgniteReceiver rel) {
             return new Item(graph.addNode(curParent, ReceiverNode.create(rel)), Collections.emptyList());
         }
 
-        @Override public Item implement(IgniteSender rel) {
+        @Override public Item visit(IgniteSender rel) {
             return new Item(graph.addNode(curParent, SenderNode.create(rel)), Commons.cast(rel.getInputs()));
         }
 
-        @Override public Item implement(IgniteExchange rel) {
-            throw new UnsupportedOperationException();
+        @Override public Item visit(IgniteRel rel) {
+            return rel.accept(this);
         }
 
-        @Override public Item implement(IgniteRel other) {
-            throw new AssertionError();
+        @Override public Item visit(IgniteExchange rel) {
+            throw new AssertionError("Unexpected node: " + rel);
         }
     }
 
     @Override public RelGraph go(IgniteRel root) {
         graph = new RelGraph();
 
-        Implementor implementor = new Implementor();
+        ItemTranslator itemTranslator = new ItemTranslator();
         Deque<Item> stack = new ArrayDeque<>();
         stack.push(new Item(-1, F.asList(root)));
 
@@ -99,7 +100,7 @@ public class RelToGraphConverter implements RelOp<IgniteRel, RelGraph>
{
             curParent = item.parentId;
 
             for (IgniteRel child : item.children) {
-                stack.push(implementor.go(child));
+                stack.push(itemTranslator.visit(child));
             }
         }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index f2ea67a..8dd3678 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -19,53 +19,104 @@ package org.apache.ignite.internal.processors.query.calcite.splitter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.calcite.linq4j.Ord;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteReceiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelShuttle;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteSender;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.RelOp;
+
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.igniteRel;
 
 /**
  *
  */
-public class Splitter extends IgniteRelShuttle {
+public class Splitter implements IgniteRelVisitor<IgniteRel>, RelOp<IgniteRel, QueryPlan>
{
     private List<Fragment> fragments;
 
-    public QueryPlan go(IgniteRel root) {
+    @Override public QueryPlan go(IgniteRel root) {
         fragments = new ArrayList<>();
 
-        fragments.add(new Fragment(root.accept(this)));
+        fragments.add(new Fragment(visit(root)));
 
         Collections.reverse(fragments);
 
         return new QueryPlan(fragments);
     }
 
-    @Override public RelNode visit(IgniteExchange rel) {
-        RelNode input = rel.getInput();
-        RelOptCluster cluster = input.getCluster();
-        RelTraitSet inputTraits = input.getTraitSet();
-        RelTraitSet outputTraits = rel.getTraitSet();
+    @Override public IgniteRel visit(IgniteExchange rel) {
+        RelOptCluster cluster = rel.getCluster();
+        RelTraitSet outTraits = rel.getTraitSet();
+
+        IgniteRel input = visit(igniteRel(rel.getInput()));
+        RelTraitSet inTraits = input.getTraitSet();
+
+        Fragment fragment = new Fragment(new IgniteSender(cluster, inTraits, input));
 
-        IgniteSender sender = new IgniteSender(cluster, inputTraits, visit(input));
-        Fragment fragment = new Fragment(sender);
         fragments.add(fragment);
 
-        return new IgniteReceiver(cluster, outputTraits, sender.getRowType(), fragment);
+        return new IgniteReceiver(cluster, outTraits, input.getRowType(), fragment);
+    }
+
+    @Override public IgniteRel visit(IgniteFilter rel) {
+        return visitChild(rel);
     }
 
-    @Override public RelNode visit(IgniteReceiver rel) {
+    @Override public IgniteRel visit(IgniteProject rel) {
+        return visitChild(rel);
+    }
+
+    @Override public IgniteRel visit(IgniteJoin rel) {
+        return visitChildren(rel);
+    }
+
+    @Override public IgniteRel visit(IgniteTableScan rel) {
+        return rel;
+    }
+
+    @Override public IgniteRel visit(IgniteRel rel) {
+        return rel.accept(this);
+    }
+
+    @Override public IgniteRel visit(IgniteReceiver rel) {
         throw new AssertionError("An attempt to split an already split task.");
     }
 
-    @Override public RelNode visit(IgniteSender rel) {
+    @Override public IgniteRel visit(IgniteSender rel) {
         throw new AssertionError("An attempt to split an already split task.");
     }
 
-    @Override protected RelNode visitOther(RelNode rel) {
-        throw new AssertionError("Unexpected node: " + rel);
+    private IgniteRel visitChildren(IgniteRel rel) {
+        for (Ord<RelNode> input : Ord.zip(rel.getInputs()))
+            visitChild(rel, input.i, igniteRel(input.e));
+
+        return rel;
+    }
+
+    /**
+     * Visits a single child of a parent.
+     */
+    private <T extends SingleRel & IgniteRel> IgniteRel visitChild(T rel) {
+        visitChild(rel, 0, igniteRel(rel.getInput()));
+
+        return rel;
+    }
+
+    /**
+     * Visits a particular child of a parent.
+     */
+    private void visitChild(IgniteRel parent, int i, IgniteRel child) {
+        IgniteRel child2 = visit(child);
+        if (child2 != child)
+            parent.replaceInput(i, child2);
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index 49d6091..1eb5c76 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -35,6 +35,8 @@ import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.NotNull;
@@ -133,4 +135,11 @@ public final class Commons {
     public static PlannerContext plannerContext(Context ctx) {
         return Objects.requireNonNull(ctx.unwrap(PlannerContext.class));
     }
+
+    public static IgniteRel igniteRel(RelNode rel) {
+        if (rel.getConvention() != IgniteConvention.INSTANCE)
+            throw new AssertionError("Unexpected node: " + rel);
+
+        return (IgniteRel) rel;
+    }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index ee4c22e..f0afa1a 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -40,7 +40,7 @@ import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.calcite.exec.ConsumerNode;
-import org.apache.ignite.internal.processors.query.calcite.exec.ImplementorImpl;
+import org.apache.ignite.internal.processors.query.calcite.exec.Implementor;
 import org.apache.ignite.internal.processors.query.calcite.exec.Node;
 import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
@@ -52,8 +52,6 @@ import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.Implementor;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.serialize.expression.Expression;
@@ -65,7 +63,6 @@ import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
-import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
@@ -75,6 +72,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.ignite.internal.processors.query.calcite.util.Commons.igniteRel;
+
 /**
  *
  */
@@ -178,7 +177,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(query);
 
@@ -217,7 +216,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(query);
 
@@ -254,7 +253,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(query);
 
@@ -293,7 +292,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(query);
 
@@ -340,7 +339,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(query);
 
@@ -400,7 +399,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(planner);
 
@@ -433,7 +432,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
 
             plan.init(ctx);
 
-            RelGraph graph = new RelToGraphConverter().go((IgniteRel) plan.fragments().get(1).root());
+            RelGraph graph = new RelToGraphConverter().go(igniteRel(plan.fragments().get(1).root()));
 
             convertedBytes = new JdkMarshaller().marshal(graph);
 
@@ -481,7 +480,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(planner);
 
@@ -511,7 +510,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
 
         assertNotNull(plan);
 
@@ -543,7 +542,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(planner);
 
@@ -569,9 +568,9 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
 
             Map<String, Object> params = ctx.query().params(F.asMap(ContextValue.QUERY_ID.valueName(),
new GridCacheVersion()));
 
-            Implementor<Node<Object[]>> implementor = new ImplementorImpl(new
DataContextImpl(params, ctx));
+            Implementor implementor = new Implementor(new DataContextImpl(params, ctx));
 
-            Node<Object[]> exec = implementor.go((IgniteRel) phys);
+            Node<Object[]> exec = implementor.go(igniteRel(phys));
 
             assertNotNull(exec);
 
@@ -634,7 +633,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(planner);
 
@@ -664,7 +663,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
 
         assertNotNull(plan);
 
@@ -725,7 +724,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(planner);
 
@@ -755,7 +754,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
 
         assertNotNull(plan);
 
@@ -816,7 +815,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(planner);
 
@@ -846,7 +845,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
 
         assertNotNull(plan);
 
@@ -900,7 +899,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(planner);
 
@@ -930,7 +929,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
 
         assertNotNull(plan);
 
@@ -991,7 +990,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(planner);
 
@@ -1021,7 +1020,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
 
         assertNotNull(plan);
 
@@ -1082,7 +1081,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = Commons.plannerContext(ctx).query();
+            Query query = ctx.query();
 
             assertNotNull(planner);
 
@@ -1112,7 +1111,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest
{
 
         assertNotNull(relRoot);
 
-        QueryPlan plan = new Splitter().go((IgniteRel) relRoot.rel);
+        QueryPlan plan = new Splitter().go(igniteRel(relRoot.rel));
 
         assertNotNull(plan);
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
index a08142c..61c8a7d 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
@@ -107,11 +107,13 @@ public class OutboxTest extends GridCommonAbstractTest {
         private List<?> lastBatch;
 
 
-        @Override public void register(Outbox outbox) {
+        @Override public <T> Outbox<T> register(Outbox<T> outbox) {
             registered = true;
+
+            return outbox;
         }
 
-        @Override public void unregister(Outbox outbox) {
+        @Override public <T> void unregister(Outbox<T> outbox) {
             unregistered = true;
         }
 
@@ -120,6 +122,18 @@ public class OutboxTest extends GridCommonAbstractTest {
 
             lastBatch = rows;
         }
+
+        @Override public Inbox register(Inbox inbox) {
+            throw new AssertionError();
+        }
+
+        @Override public void unregister(Inbox inbox) {
+            throw new AssertionError();
+        }
+
+        @Override public void acknowledge(GridCacheVersion queryId, long exchangeId, UUID
nodeId, int batchId) {
+            throw new AssertionError();
+        }
     }
 
     private static class TestSource implements Source {


Mime
View raw message