ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvvinbl...@apache.org
Subject [ignite] branch ignite-12248 updated: refactoring
Date Mon, 18 Nov 2019 15:48:30 GMT
This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 4584e07  refactoring
4584e07 is described below

commit 4584e07094feff6284f98ef746bf4ccc26d003a9
Author: Igor Seliverstov <gvvinblade@gmail.com>
AuthorDate: Mon Nov 18 18:48:21 2019 +0300

    refactoring
---
 .../query/calcite/cluster/RegistryImpl.java        |  44 ++++-----
 .../query/calcite/metadata/FragmentInfo.java       |  90 +++++++++++++++++
 .../query/calcite/metadata/FragmentLocation.java   |  68 -------------
 ...mentLocation.java => IgniteMdFragmentInfo.java} |  84 ++++------------
 .../query/calcite/metadata/IgniteMetadata.java     |  14 +--
 .../query/calcite/metadata/RelMetadataQueryEx.java |  10 +-
 .../query/calcite/prepare/IgnitePlanner.java       |  10 +-
 .../query/calcite/rel/IgniteTableScan.java         |   7 +-
 .../processors/query/calcite/rel/Receiver.java     |  45 +++------
 .../processors/query/calcite/rel/Sender.java       |  52 +++-------
 .../query/calcite/schema/IgniteTable.java          |  10 +-
 .../query/calcite/serialize/CallExpression.java    |  49 +++++++++
 .../serialize/{Node.java => Expression.java}       |   7 +-
 .../processors/query/calcite/serialize/Graph.java  |  63 ++++++++++++
 .../serialize/{Node.java => GraphNode.java}        |   5 +-
 .../{Node.java => InputRefExpression.java}         |  18 +++-
 .../{Node.java => LiteralExpression.java}          |  18 +++-
 .../{Node.java => LocalRefExpression.java}         |  19 +++-
 .../calcite/serialize/RelToGraphConverter.java     |  81 +++++++++++++++
 .../calcite/serialize/RelToNodeConverter.java      | 109 ---------------------
 .../calcite/serialize/RexToExpTranslator.java      |  97 ++++++++++++++++++
 .../query/calcite/splitter/Fragment.java           |  51 +++++-----
 .../query/calcite/splitter/QueryPlan.java          |  15 ++-
 .../query/calcite/splitter/Splitter.java           |  12 ++-
 .../calcite/trait/DestinationFunctionFactory.java  |   9 +-
 .../query/calcite/trait/DistributionTrait.java     |   2 +-
 .../query/calcite/trait/IgniteDistributions.java   |  20 ++--
 .../query/calcite/util/IgniteMethod.java           |   4 +-
 .../query/calcite/util/IgniteRelShuttle.java       |   2 +-
 .../query/calcite/CalciteQueryProcessorTest.java   |  11 +++
 30 files changed, 592 insertions(+), 434 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
index 1736da0..48c9618 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
@@ -20,15 +20,16 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.ToIntFunction;
+import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
-import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
 import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
@@ -50,13 +51,14 @@ public class RegistryImpl implements DistributionRegistry, LocationRegistry {
     }
 
     @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-        if (ctx.cache().context().cacheContext(cacheId).isReplicated())
+        CacheGroupContext grp = ctx.cache().context().cacheContext(cacheId).group();
+
+        if (grp.isReplicated())
             return IgniteDistributions.broadcast();
 
-        Object key = ctx.cache().context().affinity().affinity(cacheId).similarAffinityKey();
-        ToIntFunction<Object> partFun = ctx.cache().context().cacheContext(cacheId).affinity()::partition;
+        Object key = grp.affinity().similarAffinityKey();
 
-        return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(partFun, key));
+        return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(cacheId, key));
     }
 
     @Override public NodesMapping local() {
@@ -140,18 +142,18 @@ public class RegistryImpl implements DistributionRegistry, LocationRegistry {
     }
 
     private static class AffinityFactory implements DestinationFunctionFactory {
-        private final ToIntFunction<Object> partFun;
+        private final int cacheId;
         private final Object key;
 
-        AffinityFactory(ToIntFunction<Object> partFun, Object key) {
-            this.partFun = partFun;
+        AffinityFactory(int cacheId, Object key) {
+            this.cacheId = cacheId;
             this.key = key;
         }
 
-        @Override public DestinationFunction create(FragmentLocation targetLocation, ImmutableIntList keys) {
-            assert keys.size() == 1 && targetLocation.mapping() != null && !F.isEmpty(targetLocation.mapping().assignments());
+        @Override public DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys) {
+            assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments());
 
-            List<List<ClusterNode>> assignments = targetLocation.mapping().assignments();
+            List<List<ClusterNode>> assignments = mapping.assignments();
 
             if (U.assertionsEnabled()) {
                 for (List<ClusterNode> assignment : assignments) {
@@ -159,24 +161,14 @@ public class RegistryImpl implements DistributionRegistry, LocationRegistry {
                 }
             }
 
-            return create(assignments, partFun, keys.getInt(0));
-        }
-
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            return key.equals(((AffinityFactory) o).key);
-        }
+            ToIntFunction<Object> rowToPart = ctx.unwrap(GridKernalContext.class)
+                .cache().context().cacheContext(cacheId).affinity()::partition;
 
-        @Override public int hashCode() {
-            return key.hashCode();
+            return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)]));
         }
 
-        private static DestinationFunction create(List<List<ClusterNode>> assignments, ToIntFunction<Object> partFun, int affField) {
-            return row -> assignments.get(partFun.applyAsInt(((Object[]) row)[affField]));
+        @Override public Object key() {
+            return key;
         }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
new file mode 100644
index 0000000..6d6aa1b
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentInfo.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.metadata;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
+
+/**
+ *
+ */
+public class FragmentInfo {
+    private final NodesMapping mapping;
+    private final ImmutableList<Fragment> remoteInputs;
+    private final ImmutableIntList localInputs;
+
+    public FragmentInfo(Fragment remoteInput) {
+        this(null, ImmutableList.of(remoteInput), null);
+    }
+
+    public FragmentInfo(int localInput, NodesMapping mapping) {
+        this(ImmutableIntList.of(localInput), null, mapping);
+    }
+
+    public FragmentInfo(ImmutableIntList localInputs, ImmutableList<Fragment> remoteInputs, NodesMapping mapping) {
+        this.localInputs = localInputs;
+        this.remoteInputs = remoteInputs;
+        this.mapping = mapping;
+    }
+
+    public NodesMapping mapping() {
+        return mapping;
+    }
+
+    public ImmutableList<Fragment> remoteInputs() {
+        return remoteInputs;
+    }
+
+    public ImmutableIntList localInputs() {
+        return localInputs;
+    }
+
+    public FragmentInfo merge(FragmentInfo other) throws LocationMappingException {
+        return new FragmentInfo(
+            merge(localInputs(), other.localInputs()),
+            merge(remoteInputs(), other.remoteInputs()),
+            merge(mapping(), other.mapping()));
+    }
+
+    private static NodesMapping merge(NodesMapping left, NodesMapping right) throws LocationMappingException {
+        if (left == null)
+            return right;
+        if (right == null)
+            return left;
+
+        return left.mergeWith(right);
+    }
+
+    private static <T> ImmutableList<T> merge(ImmutableList<T> left, ImmutableList<T> right) {
+        if (left == null)
+            return right;
+        if (right == null)
+            return left;
+
+        return ImmutableList.<T>builder().addAll(left).addAll(right).build();
+    }
+
+    private static ImmutableIntList merge(ImmutableIntList left, ImmutableIntList right) {
+        if (left == null)
+            return right;
+        if (right == null)
+            return left;
+
+        return left.appendAll(right);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
deleted file mode 100644
index 26c203e..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentLocation.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.metadata;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-
-/**
- *
- */
-public class FragmentLocation {
-    private NodesMapping mapping;
-
-    private final ImmutableList<Receiver> remoteInputs;
-    private final ImmutableIntList localInputs;
-    private final AffinityTopologyVersion topVer;
-
-    public FragmentLocation(ImmutableList<Receiver> remoteInputs, AffinityTopologyVersion topVer) {
-        this(null, remoteInputs, null, topVer);
-    }
-
-    public FragmentLocation(NodesMapping mapping, ImmutableIntList localInputs, AffinityTopologyVersion topVer) {
-        this(mapping, null, localInputs, topVer);
-    }
-
-    public FragmentLocation(NodesMapping mapping, ImmutableList<Receiver> remoteInputs, ImmutableIntList localInputs, AffinityTopologyVersion topVer) {
-        this.mapping = mapping;
-        this.remoteInputs = remoteInputs;
-        this.localInputs = localInputs;
-        this.topVer = topVer;
-    }
-
-    public NodesMapping mapping() {
-        return mapping;
-    }
-
-    public void mapping(NodesMapping mapping) {
-        this.mapping = mapping;
-    }
-
-    public ImmutableList<Receiver> remoteInputs() {
-        return remoteInputs;
-    }
-
-    public ImmutableIntList localInputs() {
-        return localInputs;
-    }
-
-    public AffinityTopologyVersion topologyVersion() {
-        return topVer;
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
similarity index 51%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
index 5e6e8a9..a1f09a7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentLocation.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMdFragmentInfo.java
@@ -16,7 +16,6 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.volcano.RelSubset;
@@ -28,58 +27,50 @@ import org.apache.calcite.rel.metadata.MetadataHandler;
 import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataProvider;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentLocationMetadata;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import org.apache.ignite.internal.processors.query.calcite.util.Edge;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteMethod;
-import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  *
  */
-public class IgniteMdFragmentLocation implements MetadataHandler<FragmentLocationMetadata> {
+public class IgniteMdFragmentInfo implements MetadataHandler<FragmentMetadata> {
     public static final RelMetadataProvider SOURCE =
         ReflectiveRelMetadataProvider.reflectiveSource(
-            IgniteMethod.FRAGMENT_LOCATION.method(), new IgniteMdFragmentLocation());
+            IgniteMethod.FRAGMENT_INFO.method(), new IgniteMdFragmentInfo());
 
-    @Override public MetadataDef<FragmentLocationMetadata> getDef() {
-        return FragmentLocationMetadata.DEF;
+    @Override public MetadataDef<FragmentMetadata> getDef() {
+        return FragmentMetadata.DEF;
     }
 
-    public FragmentLocation getLocation(RelNode rel, RelMetadataQuery mq) {
+    public FragmentInfo getFragmentInfo(RelNode rel, RelMetadataQuery mq) {
         throw new AssertionError();
     }
 
-    public FragmentLocation getLocation(RelSubset rel, RelMetadataQuery mq) {
+    public FragmentInfo getFragmentInfo(RelSubset rel, RelMetadataQuery mq) {
         throw new AssertionError();
     }
 
-    public FragmentLocation getLocation(SingleRel rel, RelMetadataQuery mq) {
-        return location(rel.getInput(), mq);
+    public FragmentInfo getFragmentInfo(SingleRel rel, RelMetadataQuery mq) {
+        return fragmentInfo(rel.getInput(), mq);
     }
 
-    public FragmentLocation getLocation(Sender rel, RelMetadataQuery mq) {
-        return rel.location(mq);
-    }
-
-    public FragmentLocation getLocation(BiRel rel, RelMetadataQuery mq) {
+    public FragmentInfo getFragmentInfo(BiRel rel, RelMetadataQuery mq) {
         mq = RelMetadataQueryEx.wrap(mq);
 
-        FragmentLocation leftLoc = location(rel.getLeft(), mq);
-        FragmentLocation rightLoc = location(rel.getRight(), mq);
+        FragmentInfo left = fragmentInfo(rel.getLeft(), mq);
+        FragmentInfo right = fragmentInfo(rel.getRight(), mq);
 
         try {
-            return merge(leftLoc, rightLoc);
+            return left.merge(right);
         }
         catch (LocationMappingException e) {
             // a replicated cache is cheaper to redistribute
-            if (!leftLoc.mapping().hasPartitionedCaches())
+            if (!left.mapping().hasPartitionedCaches())
                 throw planningException(rel, e, true);
-            else if (!rightLoc.mapping().hasPartitionedCaches())
+            else if (!right.mapping().hasPartitionedCaches())
                 throw planningException(rel, e, false);
 
             // both sub-trees have partitioned sources, less cost is better
@@ -101,50 +92,15 @@ public class IgniteMdFragmentLocation implements MetadataHandler<FragmentLocatio
         return new OptimisticPlanningException(msg, new Edge(rel, rel.getRight(), 1), cause);
     }
 
-    public FragmentLocation getLocation(Receiver rel, RelMetadataQuery mq) {
-        return new FragmentLocation(ImmutableList.of(rel),
-            rel.getCluster().getPlanner().getContext().unwrap(AffinityTopologyVersion.class));
+    public FragmentInfo getFragmentInfo(Receiver rel, RelMetadataQuery mq) {
+        return new FragmentInfo(rel.source());
     }
 
-    public FragmentLocation getLocation(IgniteTableScan rel, RelMetadataQuery mq) {
-        return rel.location();
+    public FragmentInfo getFragmentInfo(IgniteTableScan rel, RelMetadataQuery mq) {
+        return rel.fragmentInfo();
     }
 
-    public static FragmentLocation location(RelNode rel, RelMetadataQuery mq) {
+    public static FragmentInfo fragmentInfo(RelNode rel, RelMetadataQuery mq) {
         return RelMetadataQueryEx.wrap(mq).getFragmentLocation(rel);
     }
-
-    private static FragmentLocation merge(FragmentLocation left, FragmentLocation right) throws LocationMappingException {
-        return new FragmentLocation(merge(left.mapping(), right.mapping()),
-            merge(left.remoteInputs(), right.remoteInputs()),
-            merge(left.localInputs(), right.localInputs()),
-            U.firstNotNull(left.topologyVersion(), right.topologyVersion()));
-    }
-
-    private static NodesMapping merge(NodesMapping left, NodesMapping right) throws LocationMappingException {
-        if (left == null)
-            return right;
-        if (right == null)
-            return left;
-
-        return left.mergeWith(right);
-    }
-
-    private static <T> ImmutableList<T> merge(ImmutableList<T> left, ImmutableList<T> right) {
-        if (left == null)
-            return right;
-        if (right == null)
-            return left;
-
-        return ImmutableList.<T>builder().addAll(left).addAll(right).build();
-    }
-
-    private static ImmutableIntList merge(ImmutableIntList left, ImmutableIntList right) {
-        if (left == null)
-            return right;
-        if (right == null)
-            return left;
-
-        return left.appendAll(right);
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
index d12c19c..7fe6fbf 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/IgniteMetadata.java
@@ -36,7 +36,7 @@ public class IgniteMetadata {
         ChainedRelMetadataProvider.of(
             ImmutableList.of(
                 IgniteMdDistribution.SOURCE,
-                IgniteMdFragmentLocation.SOURCE,
+                IgniteMdFragmentInfo.SOURCE,
                 DefaultRelMetadataProvider.INSTANCE));
 
     public interface DistributionTraitMetadata extends Metadata {
@@ -52,16 +52,16 @@ public class IgniteMetadata {
         }
     }
 
-    public interface FragmentLocationMetadata extends Metadata {
-        MetadataDef<FragmentLocationMetadata> DEF = MetadataDef.of(FragmentLocationMetadata.class,
-            FragmentLocationMetadata.Handler.class, IgniteMethod.FRAGMENT_LOCATION.method());
+    public interface FragmentMetadata extends Metadata {
+        MetadataDef<FragmentMetadata> DEF = MetadataDef.of(FragmentMetadata.class,
+            FragmentMetadata.Handler.class, IgniteMethod.FRAGMENT_INFO.method());
 
         /** Determines how the rows are distributed. */
-        FragmentLocation getLocation();
+        FragmentInfo getFragmentInfo();
 
         /** Handler API. */
-        interface Handler extends MetadataHandler<FragmentLocationMetadata> {
-            FragmentLocation getLocation(RelNode r, RelMetadataQuery mq);
+        interface Handler extends MetadataHandler<FragmentMetadata> {
+            FragmentInfo getFragmentInfo(RelNode r, RelMetadataQuery mq);
         }
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
index 713900d..0a76b4d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/RelMetadataQueryEx.java
@@ -50,7 +50,7 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
     }
 
     private IgniteMetadata.DistributionTraitMetadata.Handler distributionTraitHandler;
-    private IgniteMetadata.FragmentLocationMetadata.Handler sourceDistributionHandler;
+    private IgniteMetadata.FragmentMetadata.Handler sourceDistributionHandler;
 
     @SuppressWarnings("MethodOverridesStaticMethodOfSuperclass")
     public static RelMetadataQueryEx instance() {
@@ -82,15 +82,15 @@ public class RelMetadataQueryEx extends RelMetadataQuery {
         super(JaninoRelMetadataProvider.DEFAULT, RelMetadataQuery.EMPTY);
 
         distributionTraitHandler = initialHandler(IgniteMetadata.DistributionTraitMetadata.Handler.class);
-        sourceDistributionHandler = initialHandler(IgniteMetadata.FragmentLocationMetadata.Handler.class);
+        sourceDistributionHandler = initialHandler(IgniteMetadata.FragmentMetadata.Handler.class);
     }
 
-    public FragmentLocation getFragmentLocation(RelNode rel) {
+    public FragmentInfo getFragmentLocation(RelNode rel) {
         for (;;) {
             try {
-                return sourceDistributionHandler.getLocation(rel, this);
+                return sourceDistributionHandler.getFragmentInfo(rel, this);
             } catch (JaninoRelMetadataProvider.NoHandler e) {
-                sourceDistributionHandler = revise(e.relClass, IgniteMetadata.FragmentLocationMetadata.DEF);
+                sourceDistributionHandler = revise(e.relClass, IgniteMetadata.FragmentMetadata.DEF);
             }
         }
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index 1ae2a33..0f37a01 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -72,7 +72,7 @@ import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata;
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
-import org.apache.ignite.internal.processors.query.calcite.serialize.Node;
+import org.apache.ignite.internal.processors.query.calcite.serialize.Graph;
 
 /**
  *
@@ -219,16 +219,16 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         return root;
     }
 
-    public RelNode convert(Node node) {
+    public RelNode convert(Graph graph) {
         ready();
 
-        return null;
+        return null; // TODO
     }
 
-    public Node convert(RelNode node) {
+    public Graph convert(RelNode node) {
         ready();
 
-        return null;
+        return null; // TODO
     }
 
     /** {@inheritDoc} */
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 969180d..536290d 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -22,7 +22,7 @@ import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
-import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
@@ -42,7 +42,8 @@ public final class IgniteTableScan extends TableScan implements IgniteRel {
     return implementor.implement(this);
   }
 
-  public FragmentLocation location() {
-    return getTable().unwrap(IgniteTable.class).location(getCluster().getPlanner().getContext());
+  public FragmentInfo fragmentInfo() {
+    return getTable().unwrap(IgniteTable.class)
+        .fragmentInfo(getCluster().getPlanner().getContext());
   }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
index 7e5747c..01a3dd7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Receiver.java
@@ -16,40 +16,27 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentLocation;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
+import org.apache.calcite.rel.AbstractRelNode;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.splitter.Fragment;
 import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
 
 /**
  *
  */
-public final class Receiver extends SingleRel implements IgniteRel {
-    private FragmentLocation sourceDistribution;
+public final class Receiver extends AbstractRelNode implements IgniteRel {
+    private final Fragment source;
 
     /**
      * @param cluster Cluster this relational expression belongs to
      * @param traits Trait set.
-     * @param sender Corresponding sender.
      */
-    public Receiver(RelOptCluster cluster, RelTraitSet traits, Sender sender) {
-        super(cluster, traits, sender);
-    }
-
-    /** {@inheritDoc} */
-    @Override public Sender getInput() {
-        return (Sender) input;
-    }
-
-    /** {@inheritDoc} */
-    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-        return new Receiver(getCluster(), traitSet, (Sender) sole(inputs));
+    public Receiver(RelOptCluster cluster, RelTraitSet traits, RelDataType rowType, Fragment source) {
+        super(cluster, traits);
+        this.rowType = rowType;
+        this.source = source;
     }
 
     /** {@inheritDoc} */
@@ -57,17 +44,7 @@ public final class Receiver extends SingleRel implements IgniteRel {
         return implementor.implement(this);
     }
 
-    public void init(FragmentLocation targetDistribution, RelMetadataQuery mq) {
-        sourceDistribution = IgniteMdFragmentLocation.location(getInput(), mq);
-
-        getInput().init(targetDistribution, getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
-    }
-
-    public FragmentLocation sourceDistribution() {
-        return sourceDistribution;
-    }
-
-    public void reset() {
-        sourceDistribution = null;
+    public Fragment source() {
+        return source;
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
index 959f2cb..98a70d7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/Sender.java
@@ -16,40 +16,35 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rel;
 
-import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.SingleRel;
-import org.apache.calcite.rel.metadata.RelMetadataQuery;
-import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentLocation;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+import org.jetbrains.annotations.NotNull;
 
 /**
  *
  */
 public final class Sender extends SingleRel implements IgniteRel {
-    private FragmentLocation location;
-    private FragmentLocation targetLocation;
-    private DistributionTrait targetDistribution;
-    private DestinationFunction destinationFunction;
+    private final DistributionTrait targetDistr;
+
+    private NodesMapping targetMapping;
 
     /**
      * Creates a <code>SingleRel</code>.
-     *
-     * @param cluster Cluster this relational expression belongs to
+     *  @param cluster Cluster this relational expression belongs to
      * @param traits Trait set.
      * @param input Input relational expression
+     * @param targetDistr Target distribution
      */
-    public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input) {
+    public Sender(RelOptCluster cluster, RelTraitSet traits, RelNode input, @NotNull DistributionTrait targetDistr) {
         super(cluster, traits, input);
-    }
 
-    @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-        return new Sender(getCluster(), traitSet, sole(inputs));
+        this.targetDistr = targetDistr;
     }
 
     /** {@inheritDoc} */
@@ -57,32 +52,11 @@ public final class Sender extends SingleRel implements IgniteRel {
         return implementor.implement(this);
     }
 
-    public void init(FragmentLocation targetLocation, DistributionTrait targetDistribution) {
-        this.targetLocation = targetLocation;
-        this.targetDistribution = targetDistribution;
-    }
-
-    public DestinationFunction targetFunction() {
-        if (destinationFunction == null) {
-            assert targetLocation != null && targetLocation.mapping() != null && targetDistribution != null;
-
-            destinationFunction = targetDistribution.destinationFunctionFactory().create(targetLocation, targetDistribution.keys());
-        }
-
-        return destinationFunction;
-    }
-
-    public FragmentLocation location(RelMetadataQuery mq) {
-        if (location == null)
-            location = IgniteMdFragmentLocation.location(getInput(), mq);
-
-        return location;
+    public void init(NodesMapping mapping) {
+        targetMapping = mapping;
     }
 
-    public void reset() {
-        location = null;
-        targetLocation = null;
-        targetDistribution = null;
-        destinationFunction = null;
+    public DestinationFunction targetFunction(org.apache.calcite.plan.Context ctx) {
+        return targetDistr.destinationFunctionFactory().create(ctx, targetMapping, targetDistr.keys());
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index 3ba7b50..d298e40 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -26,12 +26,10 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTable;
-import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
-import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
 import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
-import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
@@ -81,12 +79,10 @@ public class IgniteTable extends AbstractTable implements TranslatableTable {
         return distributionRegistry(context).distribution(CU.cacheId(cacheName), rowType);
     }
 
-    public FragmentLocation location(Context ctx) {
+    public FragmentInfo fragmentInfo(Context ctx) {
         int cacheId = CU.cacheId(cacheName);
-        AffinityTopologyVersion topVer = topologyVersion(ctx);
-        NodesMapping mapping = locationRegistry(ctx).distributed(cacheId, topVer);
 
-        return new FragmentLocation(mapping, ImmutableIntList.of(cacheId), topVer);
+        return new FragmentInfo(cacheId, locationRegistry(ctx).distributed(cacheId, topologyVersion(ctx)));
     }
 
     private LocationRegistry locationRegistry(Context ctx) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java
new file mode 100644
index 0000000..f3e5bb9
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/CallExpression.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+/**
+ *
+ */
+public class CallExpression implements Expression {
+    private final RelDataType type;
+    private final SqlOperator op;
+    private final List<Expression> operands;
+
+    public CallExpression(RelDataType type, SqlOperator op, List<Expression> operands) {
+        this.type = type;
+        this.op = op;
+        this.operands = operands;
+    }
+
+    @Override public RexNode toRex(RexBuilder builder) {
+        ArrayList<RexNode> operands0 = new ArrayList<>(operands.size());
+
+        for (Expression operand : operands) {
+            operands0.add(operand.toRex(builder));
+        }
+
+        return builder.makeCall(type, op, operands0);
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
similarity index 83%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
index 172b9f4..abe2b36 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Expression.java
@@ -16,11 +16,12 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
 
 /**
  *
  */
-public interface Node {
-    IgniteRel toRel(SerializationContext ctx);
+public interface Expression {
+    RexNode toRex(RexBuilder builder);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
new file mode 100644
index 0000000..54bc932
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Graph.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.util.GridIntList;
+
+/**
+ *
+ */
+public class Graph {
+    private final List<GraphNode> nodes = new ArrayList<>();
+    private final List<GridIntList> edges = new ArrayList<>();
+
+    int addNode(GraphNode node) {
+        assert nodes.size() == edges.size();
+
+        int id = nodes.size();
+
+        nodes.add(node);
+        edges.add(new GridIntList());
+
+        return id;
+    }
+
+    void addEdge(int parentId, int childId) {
+        edges.get(parentId).add(childId);
+    }
+
+    int addChild(int parentId, GraphNode node) {
+        int id = addNode(node);
+
+        edges.get(parentId).add(id);
+
+        return id;
+    }
+
+    List<GraphNode> children(int parentId) {
+        GridIntList childrenIds = edges.get(parentId);
+        ArrayList<GraphNode> children = new ArrayList<>(childrenIds.size());
+
+        for (int i = 0; i < childrenIds.size(); i++) {
+            children.add(nodes.get(i));
+        }
+
+        return children;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
similarity index 84%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
index 172b9f4..f2f533a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/GraphNode.java
@@ -16,11 +16,8 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-
 /**
  *
  */
-public interface Node {
-    IgniteRel toRel(SerializationContext ctx);
+public interface GraphNode {
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
similarity index 61%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
index 172b9f4..421c4bb 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/InputRefExpression.java
@@ -16,11 +16,23 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
 
 /**
  *
  */
-public interface Node {
-    IgniteRel toRel(SerializationContext ctx);
+public class InputRefExpression implements Expression {
+    private final RelDataType type;
+    private final int index;
+
+    public InputRefExpression(RelDataType type, int index) {
+        this.type = type;
+        this.index = index;
+    }
+
+    @Override public RexNode toRex(RexBuilder builder) {
+        return builder.makeInputRef(type, index);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
similarity index 60%
copy from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
copy to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
index 172b9f4..923fff4 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LiteralExpression.java
@@ -16,11 +16,23 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexNode;
 
 /**
  *
  */
-public interface Node {
-    IgniteRel toRel(SerializationContext ctx);
+public class LiteralExpression implements Expression {
+    private final Comparable value;
+    private final RelDataType type;
+
+    public LiteralExpression(RelDataType type, Comparable value) {
+        this.value = value;
+        this.type = type;
+    }
+
+    @Override public RexNode toRex(RexBuilder builder) {
+        return builder.makeLiteral(value, type, false);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
similarity index 59%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
index 172b9f4..b0947ac 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/Node.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/LocalRefExpression.java
@@ -16,11 +16,24 @@
 
 package org.apache.ignite.internal.processors.query.calcite.serialize;
 
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
 
 /**
  *
  */
-public interface Node {
-    IgniteRel toRel(SerializationContext ctx);
+public class LocalRefExpression implements Expression {
+    private final RelDataType type;
+    private final int index;
+
+    public LocalRefExpression(RelDataType type, int index) {
+        this.type = type;
+        this.index = index;
+    }
+
+    @Override public RexNode toRex(RexBuilder builder) {
+        return new RexLocalRef(index, type);
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
new file mode 100644
index 0000000..c5eaae5
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToGraphConverter.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.List;
+import org.apache.calcite.rel.RelNode;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
+import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
+
+/**
+ *
+ */
+public class RelToGraphConverter implements Implementor<List<RelNode>> {
+    private Deque<List<RelNode>> stack1 = new ArrayDeque<>();
+    private Deque<Integer> stack2 = new ArrayDeque<>();
+    private Graph graph;
+
+    public Graph convert(RelNode root) {
+        stack1 = new ArrayDeque<>();
+        stack2 = new ArrayDeque<>();
+
+        graph = new Graph();
+
+        return null;
+    }
+
+    @Override public List<RelNode> implement(IgniteExchange rel) {
+        return null;
+    }
+
+    @Override public List<RelNode> implement(IgniteFilter rel) {
+        return null;
+    }
+
+    @Override public List<RelNode> implement(IgniteJoin rel) {
+        return null;
+    }
+
+    @Override public List<RelNode> implement(IgniteProject rel) {
+        return null;
+    }
+
+    @Override public List<RelNode> implement(IgniteTableScan rel) {
+        return null;
+    }
+
+    @Override public List<RelNode> implement(Receiver rel) {
+        return null;
+    }
+
+    @Override public List<RelNode> implement(Sender rel) {
+        return null;
+    }
+
+    @Override public List<RelNode> implement(IgniteRel other) {
+        return null;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToNodeConverter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToNodeConverter.java
deleted file mode 100644
index a385adf..0000000
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RelToNodeConverter.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Copyright 2019 GridGain Systems, Inc. and Contributors.
- *
- * Licensed under the GridGain Community Edition License (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.query.calcite.serialize;
-
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteJoin;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteProject;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
-import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
-import org.apache.ignite.internal.processors.query.calcite.util.Implementor;
-
-/**
- *
- */
-public class RelToNodeConverter {
-    static class ImplementorImpl implements Implementor<Node> {
-
-        @Override public Node implement(IgniteExchange rel) {
-            return null;
-        }
-
-        @Override public Node implement(IgniteFilter rel) {
-            return null;
-        }
-
-        @Override public Node implement(IgniteJoin rel) {
-            return null;
-        }
-
-        @Override public Node implement(IgniteProject rel) {
-            return null;
-        }
-
-        @Override public Node implement(IgniteTableScan rel) {
-            return null;
-        }
-
-        @Override public Node implement(Receiver rel) {
-            return null;
-        }
-
-        @Override public Node implement(Sender rel) {
-            return null;
-        }
-
-        @Override public Node implement(IgniteRel other) {
-            return null;
-        }
-    }
-
-    static class ExchangeNode implements Node {
-        @Override public IgniteRel toRel(SerializationContext ctx) {
-            return null;
-        }
-    }
-
-    static class FilterNode implements Node {
-        @Override public IgniteRel toRel(SerializationContext ctx) {
-            return null;
-        }
-    }
-
-    static class HashJoinNode implements Node {
-        @Override public IgniteRel toRel(SerializationContext ctx) {
-            return null;
-        }
-    }
-
-    static class ProjectNode implements Node {
-        @Override public IgniteRel toRel(SerializationContext ctx) {
-            return null;
-        }
-    }
-
-    static class TableScanNode implements Node {
-        @Override public IgniteRel toRel(SerializationContext ctx) {
-            return null;
-        }
-    }
-
-    static class ReceiverNode implements Node {
-        @Override public IgniteRel toRel(SerializationContext ctx) {
-            return null;
-        }
-    }
-
-    static class SenderNode implements Node {
-        @Override public IgniteRel toRel(SerializationContext ctx) {
-            return null;
-        }
-    }
-}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
new file mode 100644
index 0000000..a30edfa
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/serialize/RexToExpTranslator.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.serialize;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.calcite.rex.RexDynamicParam;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexLocalRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexOver;
+import org.apache.calcite.rex.RexPatternFieldRef;
+import org.apache.calcite.rex.RexRangeRef;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.rex.RexTableInputRef;
+import org.apache.calcite.rex.RexVisitor;
+
+/**
+ *
+ */
+public class RexToExpTranslator implements RexVisitor<Expression> {
+     @Override public Expression visitInputRef(RexInputRef inputRef) {
+        return new InputRefExpression(inputRef.getType(), inputRef.getIndex());
+    }
+
+    @Override public Expression visitLocalRef(RexLocalRef localRef) {
+        return new LocalRefExpression(localRef.getType(), localRef.getIndex());
+    }
+
+    @Override public Expression visitLiteral(RexLiteral literal) {
+        return new LiteralExpression(literal.getType(), literal.getValue());
+    }
+
+    @Override public Expression visitCall(RexCall call) {
+        return new CallExpression(call.getType(), call.getOperator(), visitList(call.getOperands()));
+    }
+
+    @Override public Expression visitOver(RexOver over) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override public Expression visitCorrelVariable(RexCorrelVariable correlVariable) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override public Expression visitDynamicParam(RexDynamicParam dynamicParam) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override public Expression visitRangeRef(RexRangeRef rangeRef) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override public Expression visitFieldAccess(RexFieldAccess fieldAccess) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override public Expression visitSubQuery(RexSubQuery subQuery) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override public Expression visitTableInputRef(RexTableInputRef fieldRef) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override public Expression visitPatternFieldRef(RexPatternFieldRef fieldRef) {
+        throw new UnsupportedOperationException();
+    }
+
+    public List<Expression> visitList(List<RexNode> operands) {
+        ArrayList<Expression> res = new ArrayList<>(operands.size());
+
+        for (RexNode operand : operands) {
+            res.add(operand.accept(this));
+        }
+
+        return res;
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index 75bc37e..8e9d9ed 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -16,16 +16,18 @@
 
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentLocation;
+import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentInfo;
 import org.apache.ignite.internal.processors.query.calcite.metadata.LocationMappingException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
-import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -35,29 +37,44 @@ import org.apache.ignite.internal.util.typedef.F;
 public class Fragment {
     public final RelNode rel;
 
-    public FragmentLocation fragmentLocation;
+    private NodesMapping mapping;
+    private ImmutableIntList localInputs;
+    private ImmutableList<Fragment> remoteInputs;
 
     public Fragment(RelNode rel) {
         this.rel = rel;
     }
 
     public void init(Context ctx, RelMetadataQuery mq) {
-        fragmentLocation = IgniteMdFragmentLocation.location(rel, mq);
+        init(null, ctx, mq);
+    }
+
+    public void init(Fragment parent, Context ctx, RelMetadataQuery mq) {
+        FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(rel, mq);
+
+        remoteInputs = info.remoteInputs();
+        localInputs = info.localInputs();
 
-        if (fragmentLocation.mapping() == null)
-            fragmentLocation.mapping(remote() ? registry(ctx).random(topologyVersion(ctx)) : registry(ctx).local());
+        if (info.mapping() == null)
+            mapping = remote() ? registry(ctx).random(topologyVersion(ctx)) : registry(ctx).local();
         else {
             try {
-                fragmentLocation.mapping(fragmentLocation.mapping().deduplicate());
+                mapping = info.mapping().deduplicate();
             }
             catch (LocationMappingException e) {
                 throw new IgniteSQLException("Failed to map fragment to location, partition lost.", e);
             }
         }
 
-        if (!F.isEmpty(fragmentLocation.remoteInputs())) {
-            for (Receiver input : fragmentLocation.remoteInputs())
-                input.init(fragmentLocation, mq);
+        if (parent != null) {
+            assert remote();
+
+            ((Sender)rel).init(parent.mapping);
+        }
+
+        if (!F.isEmpty(remoteInputs)) {
+            for (Fragment input : remoteInputs)
+                input.init(this, ctx, mq);
         }
     }
 
@@ -72,16 +89,4 @@ public class Fragment {
     private AffinityTopologyVersion topologyVersion(Context ctx) {
         return ctx.unwrap(AffinityTopologyVersion.class);
     }
-
-    public void reset() {
-        if (remote())
-            ((Sender) rel).reset();
-
-        if (fragmentLocation != null && !F.isEmpty(fragmentLocation.remoteInputs())) {
-            for (Receiver receiver : fragmentLocation.remoteInputs())
-                receiver.reset();
-        }
-
-        fragmentLocation = null;
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index a95c7fd..ae37dfd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -26,7 +26,9 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPl
 import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.util.Edge;
+import org.apache.ignite.internal.util.typedef.F;
 
 /**
  *
@@ -45,8 +47,7 @@ public class QueryPlan {
 
         while (true) {
             try {
-                for (Fragment fragment : fragments)
-                    fragment.init(ctx, mq);
+                F.first(fragments).init(ctx, mq);
 
                 break;
             }
@@ -54,9 +55,6 @@ public class QueryPlan {
                 if (++i > 3)
                     throw new IgniteSQLException("Failed to map query.", e);
 
-                for (Fragment fragment0 : fragments)
-                    fragment0.reset();
-
                 Edge edge = e.edge();
 
                 RelNode parent = edge.parent();
@@ -65,10 +63,11 @@ public class QueryPlan {
                 RelOptCluster cluster = child.getCluster();
                 RelTraitSet traitSet = child.getTraitSet();
 
-                Sender sender = new Sender(cluster, traitSet, child);
-                parent.replaceInput(edge.childIdx(), new Receiver(cluster, traitSet, sender));
+                Sender sender = new Sender(cluster, traitSet, child, traitSet.getTrait(DistributionTraitDef.INSTANCE));
+                Fragment fragment = new Fragment(sender);
+                fragments.add(fragment);
 
-                fragments.add(new Fragment(sender));
+                parent.replaceInput(edge.childIdx(), new Receiver(cluster, traitSet, sender.getRowType(), fragment));
             }
         }
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
index 5f751d3..d15adc7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Splitter.java
@@ -20,11 +20,13 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.util.IgniteRelShuttle;
 
 /**
@@ -45,12 +47,14 @@ public class Splitter extends IgniteRelShuttle {
 
     @Override public RelNode visit(IgniteExchange rel) {
         RelOptCluster cluster = rel.getCluster();
+        RelTraitSet inputTraits = rel.getInput().getTraitSet();
+        RelTraitSet outputTraits = rel.getTraitSet();
 
-        Sender sender = new Sender(cluster, rel.getInput().getTraitSet(), visit(rel.getInput()));
+        Sender sender = new Sender(cluster, inputTraits, visit(rel.getInput()), outputTraits.getTrait(DistributionTraitDef.INSTANCE));
+        Fragment fragment = new Fragment(sender);
+        fragments.add(fragment);
 
-        fragments.add(new Fragment(sender));
-
-        return new Receiver(cluster, rel.getTraitSet(), sender);
+        return new Receiver(cluster, outputTraits, sender.getRowType(), fragment);
     }
 
     @Override public RelNode visit(Receiver rel) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
index d4dc4fc..d239c5c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
@@ -16,12 +16,17 @@
 
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
+import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
-import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentLocation;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
 
 /**
  *
  */
 public interface DestinationFunctionFactory {
-    DestinationFunction create(FragmentLocation targetLocation, ImmutableIntList keys);
+    DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys);
+
+    default Object key() {
+        return getClass();
+    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
index e733461..34773f2 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DistributionTrait.java
@@ -90,7 +90,7 @@ public final class DistributionTrait implements RelTrait {
         if (type() == other.type())
             return type() != DistributionType.HASH
                 || (Objects.equals(keys(), other.keys())
-                    && Objects.equals(destinationFunctionFactory(), other.destinationFunctionFactory()));
+                    && Objects.equals(destinationFunctionFactory().key(), other.destinationFunctionFactory().key()));
 
         return other.type() == DistributionType.RANDOM && type() == DistributionType.HASH;
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
index 1237421..8363bbd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/IgniteDistributions.java
@@ -37,9 +37,9 @@ import static org.apache.ignite.internal.processors.query.calcite.trait.Distribu
  *
  */
 public class IgniteDistributions {
-    private static final DestinationFunctionFactory NO_OP_FACTORY = (t,k) -> null;
-    private static final DestinationFunctionFactory HASH_FACTORY = (t,k) -> {
-        assert t.mapping() != null && !F.isEmpty(t.mapping().assignments());
+    private static final DestinationFunctionFactory NO_OP_FACTORY = (ctx, m, k) -> null;
+    private static final DestinationFunctionFactory HASH_FACTORY = (ctx, m, k) -> {
+        assert m != null && !F.isEmpty(m.assignments());
 
         int[] fields = k.toIntArray();
 
@@ -57,7 +57,7 @@ public class IgniteDistributions {
             return hash;
         };
 
-        List<List<ClusterNode>> assignments = t.mapping().assignments();
+        List<List<ClusterNode>> assignments = m.assignments();
 
         if (U.assertionsEnabled()) {
             for (List<ClusterNode> assignment : assignments) {
@@ -99,24 +99,24 @@ public class IgniteDistributions {
     }
 
     public static DestinationFunctionFactory singleTargetFunction() {
-        return (t, k) -> {
-            List<ClusterNode> nodes = t.mapping().nodes().subList(0, 1);
+        return (ctx, m, k) -> {
+            List<ClusterNode> nodes = m.nodes().subList(0, 1);
 
             return r -> nodes;
         };
     }
 
     public static DestinationFunctionFactory allTargetsFunction() {
-        return (t, k) -> {
-            List<ClusterNode> nodes = t.mapping().nodes();
+        return (ctx, m, k) -> {
+            List<ClusterNode> nodes = m.nodes();
 
             return r -> nodes;
         };
     }
 
     public static DestinationFunctionFactory randomTargetFunction() {
-        return (t, k) -> {
-            List<ClusterNode> nodes = t.mapping().nodes();
+        return (ctx, m, k) -> {
+            List<ClusterNode> nodes = m.nodes();
 
             return r -> Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
         };
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
index 94109e9..c7a7081 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteMethod.java
@@ -20,14 +20,14 @@ package org.apache.ignite.internal.processors.query.calcite.util;
 import java.lang.reflect.Method;
 import org.apache.calcite.linq4j.tree.Types;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.DistributionTraitMetadata;
-import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentLocationMetadata;
+import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMetadata.FragmentMetadata;
 
 /**
  *
  */
 public enum IgniteMethod {
     DISTRIBUTION_TRAIT(DistributionTraitMetadata.class, "getDistributionTrait"),
-    FRAGMENT_LOCATION(FragmentLocationMetadata.class, "getLocation");
+    FRAGMENT_INFO(FragmentMetadata.class, "getFragmentInfo");
 
     private final Method method;
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
index d4b0dc6..eb1129a 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/IgniteRelShuttle.java
@@ -43,7 +43,7 @@ public class IgniteRelShuttle extends RelShuttleImpl {
     }
 
     public RelNode visit(Receiver rel) {
-        return visitChild(rel, 0, rel.getInput());
+        return rel;
     }
 
     public RelNode visit(Sender rel) {
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index df532ad..1891f44 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -29,6 +29,7 @@ import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.Frameworks;
@@ -45,6 +46,8 @@ import org.apache.ignite.internal.processors.query.calcite.rule.PlannerType;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteSchema;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.schema.RowType;
+import org.apache.ignite.internal.processors.query.calcite.serialize.Expression;
+import org.apache.ignite.internal.processors.query.calcite.serialize.RexToExpTranslator;
 import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
@@ -333,6 +336,14 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         }
 
         assertNotNull(relRoot.rel);
+
+        RexToExpTranslator translator = new RexToExpTranslator();
+
+        Project proj = (Project) relRoot.rel.getInput(0);
+
+        List<Expression> expressions = translator.visitList(proj.getProjects());
+
+        assertNotNull(expressions);
     }
 
     @Test


Mime
View raw message