ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvvinbl...@apache.org
Subject [ignite] branch ignite-12248 updated: context refactoring
Date Mon, 02 Dec 2019 18:43:10 GMT
This is an automated email from the ASF dual-hosted git repository.

gvvinblade pushed a commit to branch ignite-12248
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-12248 by this push:
     new 1824145  context refactoring
1824145 is described below

commit 1824145fdfdd1e78fa8d78b1cc8cbe0984a46beb
Author: Igor Seliverstov <gvvinblade@gmail.com>
AuthorDate: Mon Dec 2 21:42:56 2019 +0300

    context refactoring
---
 .../query/calcite/CalciteQueryProcessor.java       |  45 +++--
 .../{RegistryImpl.java => MappingServiceImpl.java} |  59 +-----
 .../cluster/TableDistributionServiceImpl.java      |  87 ++++++++
 ...ExchangeService.java => ExchangeProcessor.java} |   2 +-
 .../processors/query/calcite/exchange/Outbox.java  |  18 +-
 .../{LocationRegistry.java => MappingService.java} |   2 +-
 ...Registry.java => TableDistributionService.java} |   2 +-
 .../query/calcite/prepare/DataContextImpl.java     |  14 +-
 .../calcite/prepare/DistributedExecution.java      |  10 +-
 .../query/calcite/prepare/IgnitePlanner.java       |   5 +-
 .../query/calcite/prepare/PlannerContext.java      | 213 ++++++++++++++++++++
 .../query/calcite/rel/IgniteTableScan.java         |   3 +-
 .../processors/query/calcite/rule/IgniteRules.java |   4 +-
 .../query/calcite/rule/PlannerPhase.java           |   8 +-
 .../query/calcite/schema/IgniteTable.java          |  31 +--
 .../query/calcite/splitter/Fragment.java           |  20 +-
 .../query/calcite/splitter/QueryPlan.java          |   4 +-
 .../processors/query/calcite/splitter/Source.java  |   4 +-
 .../query/calcite/trait/AllTargetsFactory.java     |   4 +-
 .../calcite/trait/DestinationFunctionFactory.java  |   4 +-
 .../query/calcite/trait/HashFunctionFactory.java   |   4 +-
 .../query/calcite/trait/NoOpFactory.java           |   4 +-
 .../query/calcite/trait/RandomTargetFactory.java   |   4 +-
 .../query/calcite/trait/SingleTargetFactory.java   |   4 +-
 .../processors/query/calcite/util/Commons.java     |  25 +--
 .../query/calcite/CalciteQueryProcessorTest.java   | 223 ++++++++++++++-------
 .../query/calcite/exchange/OutboxTest.java         |   5 +-
 27 files changed, 548 insertions(+), 260 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 760a71c..c244f4e 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -19,11 +19,11 @@ package org.apache.ignite.internal.processors.query.calcite;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.function.BiFunction;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.fun.SqlLibrary;
 import org.apache.calcite.sql.fun.SqlLibraryOperatorTableFactory;
 import org.apache.calcite.sql.parser.SqlParser;
@@ -36,9 +36,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryContext;
 import org.apache.ignite.internal.processors.query.QueryEngine;
-import org.apache.ignite.internal.processors.query.calcite.cluster.RegistryImpl;
+import org.apache.ignite.internal.processors.query.calcite.cluster.MappingServiceImpl;
+import org.apache.ignite.internal.processors.query.calcite.cluster.TableDistributionServiceImpl;
 import org.apache.ignite.internal.processors.query.calcite.prepare.DistributedExecution;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
 import org.apache.ignite.internal.processors.query.calcite.prepare.QueryExecution;
 import org.apache.ignite.internal.processors.query.calcite.schema.CalciteSchemaHolder;
@@ -49,8 +51,6 @@ import org.apache.ignite.resources.LoggerResource;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.ignite.internal.processors.query.calcite.util.Commons.provided;
-
 /**
  *
  */
@@ -110,7 +110,7 @@ public class CalciteQueryProcessor implements QueryEngine {
     }
 
     @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String query, Object... params) throws IgniteSQLException {
-        Context context = context(Commons.convert(ctx), query, params);
+        PlannerContext context = context(Commons.convert(ctx), query, params, this::buildContext);
         QueryExecution execution = prepare(context);
         FieldsQueryCursor<List<?>> cur = execution.execute();
         return Collections.singletonList(cur);
@@ -124,16 +124,12 @@ public class CalciteQueryProcessor implements QueryEngine {
         return log;
     }
 
-    public GridKernalContext context() {
-        return kernalContext;
-    }
-
     /** */
-    public IgnitePlanner planner(RelTraitDef[] traitDefs, Context ctx) {
+    public IgnitePlanner planner(RelTraitDef[] traitDefs, PlannerContext ctx0) {
         FrameworkConfig cfg = Frameworks.newConfigBuilder(config())
-                .defaultSchema(ctx.unwrap(SchemaPlus.class))
+                .defaultSchema(ctx0.schema())
                 .traitDefs(traitDefs)
-                .context(ctx)
+                .context(ctx0)
                 .build();
 
         return new IgnitePlanner(cfg);
@@ -145,16 +141,25 @@ public class CalciteQueryProcessor implements QueryEngine {
      * @param params Query parameters.
      * @return Query execution context.
      */
-    Context context(@NotNull Context ctx, String query, Object[] params) { // Package private visibility for tests.
-        return Contexts.chain(ctx, config.getContext(),
-            Contexts.of(
-                new Query(query, params),
-                new RegistryImpl(kernalContext),
-                provided(ctx, SchemaPlus.class, schemaHolder::schema),
-                provided(ctx, AffinityTopologyVersion.class, this::readyAffinityVersion)));
+    PlannerContext context(@NotNull Context ctx, String query, Object[] params, BiFunction<Context, Query, PlannerContext> clo) { // Package private visibility for tests.
+        return clo.apply(Contexts.chain(ctx, config.getContext()), new Query(query, params));
+    }
+
+    private PlannerContext buildContext(@NotNull Context parent, @NotNull Query query) {
+        return PlannerContext.builder()
+            .logger(log)
+            .kernalContext(kernalContext)
+            .queryProcessor(this)
+            .parentContext(parent)
+            .query(query)
+            .schema(schemaHolder.schema())
+            .topologyVersion(readyAffinityVersion())
+            .distributionService(new TableDistributionServiceImpl(kernalContext))
+            .mappingService(new MappingServiceImpl(kernalContext))
+            .build();
     }
 
-    private QueryExecution prepare(Context ctx) {
+    private QueryExecution prepare(PlannerContext ctx) {
         return new DistributedExecution(ctx);
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
similarity index 68%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
index 71f8077..66c7a74 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/RegistryImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/MappingServiceImpl.java
@@ -20,52 +20,30 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import java.util.function.ToIntFunction;
-import org.apache.calcite.plan.Context;
-import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
-import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
-import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
-import org.apache.ignite.internal.processors.query.calcite.trait.AbstractDestinationFunctionFactory;
-import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
-import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
-import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
-import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping.DEDUPLICATED;
 
 /**
  *
  */
-public class RegistryImpl implements DistributionRegistry, LocationRegistry {
+public class MappingServiceImpl implements MappingService {
     private final GridKernalContext ctx;
 
-    public RegistryImpl(GridKernalContext ctx) {
+    public MappingServiceImpl(GridKernalContext ctx) {
         this.ctx = ctx;
     }
 
-    @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-        CacheGroupContext grp = ctx.cache().context().cacheContext(cacheId).group();
-
-        if (grp.isReplicated())
-            return IgniteDistributions.broadcast();
-
-        Object key = grp.affinity().similarAffinityKey();
-
-        return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(cacheId, key));
-    }
-
     @Override public NodesMapping local() {
         return new NodesMapping(Collections.singletonList(ctx.discovery().localNode().id()), null, DEDUPLICATED);
     }
@@ -153,35 +131,4 @@ public class RegistryImpl implements DistributionRegistry, LocationRegistry {
         }
         return true;
     }
-
-    private final static class AffinityFactory extends AbstractDestinationFunctionFactory {
-        private final int cacheId;
-        private final Object key;
-
-        AffinityFactory(int cacheId, Object key) {
-            this.cacheId = cacheId;
-            this.key = key;
-        }
-
-        @Override public DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys) {
-            assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments());
-
-            List<List<UUID>> assignments = mapping.assignments();
-
-            if (U.assertionsEnabled()) {
-                for (List<UUID> assignment : assignments) {
-                    assert F.isEmpty(assignment) || assignment.size() == 1;
-                }
-            }
-
-            ToIntFunction<Object> rowToPart = ctx.unwrap(GridKernalContext.class)
-                .cache().context().cacheContext(cacheId).affinity()::partition;
-
-            return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)]));
-        }
-
-        @Override public Object key() {
-            return key;
-        }
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java
new file mode 100644
index 0000000..20da342
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/cluster/TableDistributionServiceImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.cluster;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.function.ToIntFunction;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
+import org.apache.ignite.internal.processors.query.calcite.trait.AbstractDestinationFunctionFactory;
+import org.apache.ignite.internal.processors.query.calcite.trait.DestinationFunction;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class TableDistributionServiceImpl implements TableDistributionService {
+    private final GridKernalContext ctx;
+
+    public TableDistributionServiceImpl(GridKernalContext ctx) {
+        this.ctx = ctx;
+    }
+
+    @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
+        CacheGroupContext grp = ctx.cache().context().cacheContext(cacheId).group();
+
+        if (grp.isReplicated())
+            return IgniteDistributions.broadcast();
+
+        Object key = grp.affinity().similarAffinityKey();
+
+        return IgniteDistributions.hash(rowType.distributionKeys(), new AffinityFactory(cacheId, key));
+    }
+
+    private final static class AffinityFactory extends AbstractDestinationFunctionFactory {
+        private final int cacheId;
+        private final Object key;
+
+        AffinityFactory(int cacheId, Object key) {
+            this.cacheId = cacheId;
+            this.key = key;
+        }
+
+        @Override public DestinationFunction create(PlannerContext ctx, NodesMapping mapping, ImmutableIntList keys) {
+            assert keys.size() == 1 && mapping != null && !F.isEmpty(mapping.assignments());
+
+            List<List<UUID>> assignments = mapping.assignments();
+
+            if (U.assertionsEnabled()) {
+                for (List<UUID> assignment : assignments) {
+                    assert F.isEmpty(assignment) || assignment.size() == 1;
+                }
+            }
+
+            ToIntFunction<Object> rowToPart = ctx.kernalContext()
+                .cache().context().cacheContext(cacheId).affinity()::partition;
+
+            return row -> assignments.get(rowToPart.applyAsInt(((Object[]) row)[keys.getInt(0)]));
+        }
+
+        @Override public Object key() {
+            return key;
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
index 788011a..692b4fa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeService.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/ExchangeProcessor.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 /**
  *
  */
-public interface ExchangeService {
+public interface ExchangeProcessor {
     void register(Outbox outbox);
     void unregister(Outbox outbox);
     void send(GridCacheVersion queryId, long exchangeId, UUID nodeId, int batchId, List<?> rows);
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
index 47351e2..b1cf688 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exchange/Outbox.java
@@ -43,7 +43,7 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T>
     private final Collection<UUID> targets;
     private final DestinationFunction function;
 
-    private ExchangeService srvc;
+    private ExchangeProcessor srvc;
 
     protected Outbox(GridCacheVersion queryId, long exchangeId, Collection<UUID> targets, DestinationFunction function) {
         super(Sink.noOp());
@@ -54,6 +54,14 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T>
         this.function = function;
     }
 
+    public void init(ExchangeProcessor srvc) {
+        this.srvc = srvc;
+
+        srvc.register(this);
+
+        signal();
+    }
+
     public void acknowledge(UUID nodeId, int batchId) {
         perNode.get(nodeId).acknowledge(batchId);
     }
@@ -91,14 +99,6 @@ public class Outbox<T> extends AbstractNode<T> implements SingleNode<T>, Sink<T>
         return true;
     }
 
-    public void init(ExchangeService srvc) {
-        this.srvc = srvc;
-
-        srvc.register(this);
-
-        signal();
-    }
-
     @Override public void end() {
         for (UUID node : targets)
             perNode.computeIfAbsent(node, Destination::new).end();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java
similarity index 96%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java
index bf62302..9472cd1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/LocationRegistry.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/MappingService.java
@@ -21,7 +21,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 /**
  *
  */
-public interface LocationRegistry {
+public interface MappingService {
     NodesMapping local(); // returns local node with single partition
     NodesMapping random(AffinityTopologyVersion topVer); // returns random distribution, partitions count depends on nodes count
     NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer); // returns cache distribution
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java
similarity index 95%
rename from modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
rename to modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java
index 3a20908..ad710aa 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/DistributionRegistry.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/TableDistributionService.java
@@ -22,6 +22,6 @@ import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 /**
  *
  */
-public interface DistributionRegistry {
+public interface TableDistributionService {
     DistributionTrait distribution(int cacheId, RowType rowType);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java
index 49b57ad..06aaa024 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DataContextImpl.java
@@ -20,18 +20,15 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
 import java.util.Map;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
 import org.apache.calcite.linq4j.QueryProvider;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
 
 /**
  *
  */
 class DataContextImpl implements DataContext {
     /** */
-    private final JavaTypeFactoryImpl typeFactory;
+    private final JavaTypeFactory typeFactory;
 
     /** */
     private final SchemaPlus schema;
@@ -46,10 +43,11 @@ class DataContextImpl implements DataContext {
      * @param params Parameters.
      * @param ctx Query context.
      */
-    DataContextImpl(Map<String, Object> params, Context ctx) {
-        typeFactory = new JavaTypeFactoryImpl(ctx.unwrap(CalciteQueryProcessor.class).config().getTypeSystem());
-        schema = ctx.unwrap(SchemaPlus.class);
-        queryProvider = ctx.unwrap(QueryProvider.class);
+    DataContextImpl(Map<String, Object> params, PlannerContext ctx) {
+        typeFactory = ctx.typeFactory();
+        schema = ctx.schema();
+        queryProvider = ctx.queryProvider();
+
         this.params = params;
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
index fdaee5a..192e686 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/DistributedExecution.java
@@ -19,9 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite.prepare;
 
 import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
 import org.apache.calcite.linq4j.Linq4j;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.ConventionTraitDef;
 import org.apache.calcite.plan.RelTraitDef;
 import org.apache.calcite.plan.RelTraitSet;
@@ -47,19 +45,19 @@ import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryC
  */
 public class DistributedExecution implements QueryExecution {
     /** */
-    private final Context ctx;
+    private final PlannerContext ctx;
 
     /**
      * @param ctx Query context.
      */
-    public DistributedExecution(Context ctx) {
+    public DistributedExecution(PlannerContext ctx) {
         this.ctx = ctx;
     }
 
     /** {@inheritDoc} */
     @Override public FieldsQueryCursor<List<?>> execute() {
-        CalciteQueryProcessor proc = Objects.requireNonNull(ctx.unwrap(CalciteQueryProcessor.class));
-        Query query = Objects.requireNonNull(ctx.unwrap(Query.class));
+        CalciteQueryProcessor proc = ctx.queryProcessor();
+        Query query = ctx.query();
 
         RelTraitDef[] traitDefs = {
             RelDistributionTraitDef.INSTANCE,
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
index 2462785..2506de8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/IgnitePlanner.java
@@ -80,6 +80,7 @@ import org.apache.ignite.internal.processors.query.calcite.splitter.QueryPlan;
 import org.apache.ignite.internal.processors.query.calcite.splitter.Splitter;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeSystem;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
  *
@@ -124,6 +125,8 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
             .typeSystem(RelDataTypeSystem.class, IgniteTypeSystem.DEFAULT);
 
         typeFactory = new IgniteTypeFactory(typeSystem);
+
+        Commons.plannerContext(context).planner(this);
     }
 
     private CalciteConnectionConfig connConfig() {
@@ -302,7 +305,7 @@ public class IgnitePlanner implements Planner, RelOptTable.ViewExpander {
         ready();
 
         RelTraitSet toTraits = targetTraits.simplify();
-        RuleSet rules = plannerPhase.getRules(context);
+        RuleSet rules = plannerPhase.getRules(Commons.plannerContext(context));
 
         input.accept(new MetaDataProviderModifier(metadataProvider));
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java
new file mode 100644
index 0000000..dc20880
--- /dev/null
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/PlannerContext.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.prepare;
+
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.linq4j.QueryProvider;
+import org.apache.calcite.plan.Context;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.exchange.ExchangeProcessor;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
+import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
+import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
+import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+
+/**
+ *
+ */
+public final class PlannerContext implements Context {
+    private final Context parentContext;
+    private final Query query;
+    private final AffinityTopologyVersion topologyVersion;
+    private final SchemaPlus schema;
+    private final IgniteLogger logger;
+    private final GridKernalContext kernalContext;
+    private final CalciteQueryProcessor queryProcessor;
+    private final MappingService mappingService;
+    private final TableDistributionService distributionService;
+    private final ExchangeProcessor exchangeProcessor;
+
+    private IgnitePlanner planner;
+
+    private PlannerContext(Context parentContext, Query query, AffinityTopologyVersion topologyVersion,
+        SchemaPlus schema, IgniteLogger logger, GridKernalContext kernalContext, CalciteQueryProcessor queryProcessor, MappingService mappingService,
+        TableDistributionService distributionService, ExchangeProcessor exchangeProcessor) {
+        this.parentContext = parentContext;
+        this.query = query;
+        this.topologyVersion = topologyVersion;
+        this.schema = schema;
+        this.logger = logger;
+        this.kernalContext = kernalContext;
+        this.queryProcessor = queryProcessor;
+        this.mappingService = mappingService;
+        this.distributionService = distributionService;
+        this.exchangeProcessor = exchangeProcessor;
+    }
+
+    public Query query() {
+        return query;
+    }
+
+    public AffinityTopologyVersion topologyVersion() {
+        return topologyVersion;
+    }
+
+    public SchemaPlus schema() {
+        return schema;
+    }
+
+    public IgniteLogger logger() {
+        return logger;
+    }
+
+    public GridKernalContext kernalContext() {
+        return kernalContext;
+    }
+
+    public CalciteQueryProcessor queryProcessor() {
+        return queryProcessor;
+    }
+
+    void planner(IgnitePlanner planner) {
+        this.planner = planner;
+    }
+
+    public IgnitePlanner planner() {
+        return planner;
+    }
+
+    public MappingService mappingService() {
+        return mappingService;
+    }
+
+    public TableDistributionService distributionService() {
+        return distributionService;
+    }
+
+    public ExchangeProcessor exchangeProcessor() {
+        return exchangeProcessor;
+    }
+
+    // Helper methods
+
+    public JavaTypeFactory typeFactory() {
+        return planner.getTypeFactory();
+    }
+
+    public NodesMapping mapForLocal() {
+        return mappingService.local();
+    }
+
+    public NodesMapping mapForRandom(AffinityTopologyVersion topVer) {
+        return mappingService.random(topVer);
+    }
+
+    public NodesMapping mapForCache(int cacheId, AffinityTopologyVersion topVer) {
+        return mappingService.distributed(cacheId, topVer);
+    }
+
+    public DistributionTrait distributionTrait(int cacheId, RowType rowType) {
+        return distributionService.distribution(cacheId, rowType);
+    }
+
+    public QueryProvider queryProvider() {
+        return null; // TODO
+    }
+
+    @Override public <C> C unwrap(Class<C> aClass) {
+        if (aClass == getClass())
+            return aClass.cast(this);
+
+        return parentContext.unwrap(aClass);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+        private Context parentContext;
+        private Query query;
+        private AffinityTopologyVersion topologyVersion;
+        private SchemaPlus schema;
+        private IgniteLogger logger;
+        private GridKernalContext kernalContext;
+        private CalciteQueryProcessor queryProcessor;
+        private MappingService mappingService;
+        private TableDistributionService distributionService;
+        private ExchangeProcessor exchangeProcessor;
+
+        public Builder parentContext(Context parentContext) {
+            this.parentContext = parentContext;
+            return this;
+        }
+
+        public Builder query(Query query) {
+            this.query = query;
+            return this;
+        }
+
+        public Builder topologyVersion(AffinityTopologyVersion topologyVersion) {
+            this.topologyVersion = topologyVersion;
+            return this;
+        }
+
+        public Builder schema(SchemaPlus schema) {
+            this.schema = schema;
+            return this;
+        }
+
+        public Builder logger(IgniteLogger logger) {
+            this.logger = logger;
+            return this;
+        }
+
+        public Builder kernalContext(GridKernalContext kernalContext) {
+            this.kernalContext = kernalContext;
+            return this;
+        }
+
+        public Builder queryProcessor(CalciteQueryProcessor queryProcessor) {
+            this.queryProcessor = queryProcessor;
+            return this;
+        }
+
+        public Builder mappingService(MappingService mappingService) {
+            this.mappingService = mappingService;
+            return this;
+        }
+
+        public Builder distributionService(TableDistributionService distributionService) {
+            this.distributionService = distributionService;
+            return this;
+        }
+
+        public Builder exchangeProcessor(ExchangeProcessor exchangeProcessor) {
+            this.exchangeProcessor = exchangeProcessor;
+            return this;
+        }
+
+        public PlannerContext build() {
+            return new PlannerContext(parentContext, query, topologyVersion, schema, logger, kernalContext, queryProcessor, mappingService, distributionService, exchangeProcessor);
+        }
+    }
+}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
index 12b7b99..c529aee 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/IgniteTableScan.java
@@ -24,6 +24,7 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
 import org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.RelImplementor;
 
 public final class IgniteTableScan extends TableScan implements IgniteRel {
@@ -44,6 +45,6 @@ public final class IgniteTableScan extends TableScan implements IgniteRel {
 
   public FragmentInfo fragmentInfo() {
     return getTable().unwrap(IgniteTable.class)
-        .fragmentInfo(getCluster().getPlanner().getContext());
+        .fragmentInfo(Commons.plannerContext(getCluster().getPlanner().getContext()));
   }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
index d089339..cf514d5 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/IgniteRules.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.query.calcite.rule;
 
 import com.google.common.collect.ImmutableList;
 import java.util.List;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.volcano.AbstractConverter;
 import org.apache.calcite.rel.rules.AggregateExpandDistinctAggregatesRule;
@@ -62,6 +61,7 @@ import org.apache.calcite.rel.rules.UnionMergeRule;
 import org.apache.calcite.rel.rules.UnionPullUpConstantsRule;
 import org.apache.calcite.rel.rules.UnionToDistinctRule;
 import org.apache.calcite.rel.rules.ValuesReduceRule;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 
 /**
  *
@@ -149,7 +149,7 @@ public class IgniteRules {
         IgniteProjectRule.INSTANCE,
         IgniteJoinRule.INSTANCE);
 
-    public static List<RelOptRule> logicalRules(Context ctx) {
+    public static List<RelOptRule> logicalRules(PlannerContext ctx) {
         return ImmutableList.<RelOptRule>builder()
             .addAll(BASE_RULES)
             .addAll(ABSTRACT_RULES)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
index aa82187..24938a1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rule/PlannerPhase.java
@@ -17,9 +17,9 @@
 
 package org.apache.ignite.internal.processors.query.calcite.rule;
 
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.tools.RuleSet;
 import org.apache.calcite.tools.RuleSets;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 
 /**
  *
@@ -27,14 +27,14 @@ import org.apache.calcite.tools.RuleSets;
 public enum PlannerPhase {
     /** */
     SUBQUERY_REWRITE("Sub-queries rewrites") {
-        @Override public RuleSet getRules(Context ctx) {
+        @Override public RuleSet getRules(PlannerContext ctx) {
             return RuleSets.ofList(IgniteRules.SUBQUERY_REWRITE_RULES);
         }
     },
 
     /** */
     LOGICAL("Logical planning") {
-        @Override public RuleSet getRules(Context ctx) {
+        @Override public RuleSet getRules(PlannerContext ctx) {
             return RuleSets.ofList(IgniteRules.logicalRules(ctx));
         }
     };
@@ -45,5 +45,5 @@ public enum PlannerPhase {
         this.description = description;
     }
 
-    public abstract RuleSet getRules(Context ctx);
+    public abstract RuleSet getRules(PlannerContext ctx);
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
index b14ea3b..805c455 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/schema/IgniteTable.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.calcite.schema;
 
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptTable;
 import org.apache.calcite.plan.RelTraitSet;
@@ -26,15 +25,14 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.schema.TranslatableTable;
 import org.apache.calcite.schema.impl.AbstractTable;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
-import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteTableScan;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
 /** */
@@ -71,30 +69,19 @@ public class IgniteTable extends AbstractTable implements TranslatableTable {
     /** {@inheritDoc} */
     @Override public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
         RelOptCluster cluster = context.getCluster();
+        PlannerContext ctx = Commons.plannerContext(cluster.getPlanner().getContext());
         RelTraitSet traitSet = cluster.traitSet().replace(IgniteRel.IGNITE_CONVENTION)
-                .replaceIf(DistributionTraitDef.INSTANCE, () -> distributionTrait(cluster.getPlanner().getContext()));
+                .replaceIf(DistributionTraitDef.INSTANCE, () -> distributionTrait(ctx));
         return new IgniteTableScan(cluster, traitSet, relOptTable);
     }
 
-    public DistributionTrait distributionTrait(Context context) {
-        return distributionRegistry(context).distribution(CU.cacheId(cacheName), rowType);
+    public DistributionTrait distributionTrait(PlannerContext context) {
+        return Commons.plannerContext(context).distributionTrait(CU.cacheId(cacheName), rowType);
     }
 
-    public FragmentInfo fragmentInfo(Context ctx) {
-        int cacheId = CU.cacheId(cacheName);
+    public FragmentInfo fragmentInfo(PlannerContext ctx) {
+        PlannerContext ctx0 = Commons.plannerContext(ctx);
 
-        return new FragmentInfo(locationRegistry(ctx).distributed(cacheId, topologyVersion(ctx)));
-    }
-
-    private LocationRegistry locationRegistry(Context ctx) {
-        return ctx.unwrap(LocationRegistry.class);
-    }
-
-    public DistributionRegistry distributionRegistry(Context ctx) {
-        return ctx.unwrap(DistributionRegistry.class);
-    }
-
-    private AffinityTopologyVersion topologyVersion(Context ctx) {
-        return ctx.unwrap(AffinityTopologyVersion.class);
+        return new FragmentInfo(ctx0.mapForCache(CU.cacheId(cacheName), ctx0.topologyVersion()));
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
index d12c73a..45c4591 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Fragment.java
@@ -17,17 +17,14 @@
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
 import com.google.common.collect.ImmutableList;
-import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Pair;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.query.calcite.metadata.FragmentInfo;
 import org.apache.ignite.internal.processors.query.calcite.metadata.IgniteMdFragmentInfo;
-import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
@@ -49,11 +46,12 @@ public class Fragment implements Source {
         this.root = root;
     }
 
-    public void init(Context ctx, RelMetadataQuery mq) {
+    public void init(PlannerContext ctx, RelMetadataQuery mq) {
+
         FragmentInfo info = IgniteMdFragmentInfo.fragmentInfo(root, mq);
 
         if (info.mapping() == null)
-            mapping = remote() ? registry(ctx).random(topologyVersion(ctx)) : registry(ctx).local();
+            mapping = remote() ? ctx.mapForRandom(ctx.topologyVersion()) : ctx.mapForLocal();
         else
             mapping = info.mapping().deduplicate();
 
@@ -73,7 +71,7 @@ public class Fragment implements Source {
         return exchangeId;
     }
 
-    @Override public void init(NodesMapping mapping, DistributionTrait distribution, Context ctx, RelMetadataQuery mq) {
+    @Override public void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) {
         assert remote();
 
         ((Sender) root).init(new TargetImpl(exchangeId, mapping, distribution));
@@ -92,12 +90,4 @@ public class Fragment implements Source {
     private boolean remote() {
         return root instanceof Sender;
     }
-
-    private LocationRegistry registry(Context ctx) {
-        return Objects.requireNonNull(ctx.unwrap(LocationRegistry.class));
-    }
-
-    private AffinityTopologyVersion topologyVersion(Context ctx) {
-        return Objects.requireNonNull(ctx.unwrap(AffinityTopologyVersion.class));
-    }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
index c03879b..b0369bd 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/QueryPlan.java
@@ -17,13 +17,13 @@
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
 import java.util.List;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.OptimisticPlanningException;
 import org.apache.ignite.internal.processors.query.calcite.metadata.RelMetadataQueryEx;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.Receiver;
 import org.apache.ignite.internal.processors.query.calcite.rel.Sender;
 import org.apache.ignite.internal.processors.query.calcite.util.Edge;
@@ -39,7 +39,7 @@ public class QueryPlan {
         this.fragments = fragments;
     }
 
-    public void init(Context ctx) {
+    public void init(PlannerContext ctx) {
         int i = 0;
 
         RelMetadataQueryEx mq = RelMetadataQueryEx.instance();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
index 773dee1..e0108b7 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/splitter/Source.java
@@ -16,9 +16,9 @@
 
 package org.apache.ignite.internal.processors.query.calcite.splitter;
 
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTrait;
 
 /**
@@ -41,7 +41,7 @@ public interface Source {
      * @param ctx Context.
      * @param mq Metadata query instance.
      */
-    default void init(NodesMapping mapping, DistributionTrait distribution, Context ctx, RelMetadataQuery mq) {
+    default void init(NodesMapping mapping, DistributionTrait distribution, PlannerContext ctx, RelMetadataQuery mq) {
         // No-op.
     }
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
index 2dcfe33..30ce8b8 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/AllTargetsFactory.java
@@ -19,9 +19,9 @@ package org.apache.ignite.internal.processors.query.calcite.trait;
 import java.io.ObjectStreamException;
 import java.util.List;
 import java.util.UUID;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 
 /**
  *
@@ -29,7 +29,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 public final class AllTargetsFactory extends AbstractDestinationFunctionFactory {
     public static final DestinationFunctionFactory INSTANCE = new AllTargetsFactory();
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) {
+    @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
         List<UUID> nodes = m.nodes();
 
         return r -> nodes;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
index 587c172..d12ec15 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/DestinationFunctionFactory.java
@@ -17,15 +17,15 @@
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
 import java.io.Serializable;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 
 /**
  *
  */
 public interface DestinationFunctionFactory extends Serializable {
-    DestinationFunction create(Context ctx, NodesMapping mapping, ImmutableIntList keys);
+    DestinationFunction create(PlannerContext ctx, NodesMapping mapping, ImmutableIntList keys);
 
     Object key();
 }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
index 3c2420b..863f93c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/HashFunctionFactory.java
@@ -20,9 +20,9 @@ import java.io.ObjectStreamException;
 import java.util.List;
 import java.util.UUID;
 import java.util.function.ToIntFunction;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 public final class HashFunctionFactory extends AbstractDestinationFunctionFactory {
     public static final DestinationFunctionFactory INSTANCE = new HashFunctionFactory();
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) {
+    @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
         assert m != null && !F.isEmpty(m.assignments());
 
         int[] fields = k.toIntArray();
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
index d8495b6..7086277 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/NoOpFactory.java
@@ -17,9 +17,9 @@
 package org.apache.ignite.internal.processors.query.calcite.trait;
 
 import java.io.ObjectStreamException;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 
 /**
  *
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 public final class NoOpFactory extends AbstractDestinationFunctionFactory {
     public static final DestinationFunctionFactory INSTANCE = new NoOpFactory();
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) {
+    @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
         return null;
     }
 
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
index 78013d3..8a643c3 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/RandomTargetFactory.java
@@ -21,9 +21,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 
 /**
  *
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping
 public final class RandomTargetFactory extends AbstractDestinationFunctionFactory {
     public static final DestinationFunctionFactory INSTANCE = new RandomTargetFactory();
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) {
+    @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
         List<UUID> nodes = m.nodes();
 
         return r -> Collections.singletonList(nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())));
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
index 4d631fc..8a8b27c 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/trait/SingleTargetFactory.java
@@ -21,9 +21,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
-import org.apache.calcite.plan.Context;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.util.typedef.F;
 
 /**
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.util.typedef.F;
 public final class SingleTargetFactory extends AbstractDestinationFunctionFactory {
     public static final DestinationFunctionFactory INSTANCE = new SingleTargetFactory();
 
-    @Override public DestinationFunction create(Context ctx, NodesMapping m, ImmutableIntList k) {
+    @Override public DestinationFunction create(PlannerContext ctx, NodesMapping m, ImmutableIntList k) {
         List<UUID> nodes = Collections.singletonList(Objects.requireNonNull(F.first(m.nodes())));
 
         return r -> nodes;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
index c6d6a6c..3a461cf 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/Commons.java
@@ -28,7 +28,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
@@ -42,11 +41,11 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.QueryContext;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -58,24 +57,6 @@ public final class Commons {
         return ctx == null ? Contexts.empty() : Contexts.of(ctx.unwrap(Object[].class));
     }
 
-    public static <T> @Nullable T provided(Context ctx, Class<T> paramType, Supplier<T> paramSrc) {
-        T param = ctx.unwrap(paramType);
-
-        if (param != null)
-            return null; // Provided by parent context.
-
-        return paramSrc.get();
-    }
-
-    public static <T> T contextParam(Context ctx, Class<T> paramType, Supplier<T> paramSrc) {
-        T param = ctx.unwrap(paramType);
-
-        if (param != null)
-            return param;
-
-        return paramSrc.get();
-    }
-
     /** */
     public static RowType rowType(GridQueryTypeDescriptor desc) {
         RowType.Builder b = RowType.builder();
@@ -189,4 +170,8 @@ public final class Commons {
 
         return set;
     }
+
+    public static PlannerContext plannerContext(Context ctx) {
+        return Objects.requireNonNull(ctx.unwrap(PlannerContext.class));
+    }
 }
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 6c04c1e..644633b 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.calcite.plan.Context;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
@@ -35,10 +34,11 @@ import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.tools.Frameworks;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.query.calcite.metadata.DistributionRegistry;
-import org.apache.ignite.internal.processors.query.calcite.metadata.LocationRegistry;
+import org.apache.ignite.internal.processors.query.calcite.metadata.MappingService;
 import org.apache.ignite.internal.processors.query.calcite.metadata.NodesMapping;
+import org.apache.ignite.internal.processors.query.calcite.metadata.TableDistributionService;
 import org.apache.ignite.internal.processors.query.calcite.prepare.IgnitePlanner;
+import org.apache.ignite.internal.processors.query.calcite.prepare.PlannerContext;
 import org.apache.ignite.internal.processors.query.calcite.prepare.Query;
 import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.rule.PlannerPhase;
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTra
 import org.apache.ignite.internal.processors.query.calcite.trait.DistributionTraitDef;
 import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import org.apache.ignite.internal.processors.query.calcite.type.RowType;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -68,18 +69,17 @@ import org.junit.Test;
 //@WithSystemProperty(key = "calcite.debug", value = "true")
 public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
+    private static GridTestKernalContext kernalContext;
     private static CalciteQueryProcessor proc;
     private static SchemaPlus schema;
-
-    private static TestRegistry registry;
     private static List<UUID> nodes;
 
     @BeforeClass
     public static void setupClass() {
+        kernalContext = new GridTestKernalContext(log);
         proc = new CalciteQueryProcessor();
-
         proc.setLogger(log);
-        proc.start(new GridTestKernalContext(log));
+        proc.start(kernalContext);
 
         IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
 
@@ -123,8 +123,6 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         for (int i = 0; i < 4; i++) {
             nodes.add(UUID.randomUUID());
         }
-
-        registry = new TestRegistry();
     }
 
     @Test
@@ -136,7 +134,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.projectId = p.id0 + 1" +
             "WHERE (d.projectId + 1) > ?";
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
 
         assertNotNull(ctx);
 
@@ -149,7 +147,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(query);
 
@@ -175,7 +173,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.projectId = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
 
         assertNotNull(ctx);
 
@@ -188,7 +186,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(query);
 
@@ -212,7 +210,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         String sql = "SELECT d.id, (SELECT p.name FROM Project p WHERE p.id = d.id) name, d.projectId " +
             "FROM Developer d";
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
 
         assertNotNull(ctx);
 
@@ -225,7 +223,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(query);
 
@@ -251,7 +249,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.projectId = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
 
         assertNotNull(ctx);
 
@@ -264,7 +262,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(query);
 
@@ -297,7 +295,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.projectId = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
 
         assertNotNull(ctx);
 
@@ -311,7 +309,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(query);
 
@@ -357,7 +355,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.id = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
 
         assertNotNull(ctx);
 
@@ -371,7 +369,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(planner);
 
@@ -433,7 +431,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.id = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, this::context);
 
         assertNotNull(ctx);
 
@@ -447,7 +445,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(planner);
 
@@ -497,10 +495,20 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.id = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        TestRegistry registry = new TestRegistry(){
+        TableDistributionService ds = new TableDistributionService(){
             @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
                 return IgniteDistributions.broadcast();
             }
+        };
+
+        MappingService ms = new MappingService() {
+            @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+                return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+            }
+
+            @Override public NodesMapping local() {
+                return new NodesMapping(select(nodes, 0), null, (byte) 0);
+            }
 
             @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
@@ -512,9 +520,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
-
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
         assertNotNull(ctx);
 
         RelTraitDef[] traitDefs = {
@@ -527,7 +533,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(planner);
 
@@ -577,13 +583,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.id = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        TestRegistry registry = new TestRegistry(){
+        TableDistributionService ds = new TableDistributionService(){
             @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
                 if (cacheId == CU.cacheId("Project"))
                     return IgniteDistributions.broadcast();
 
                 return IgniteDistributions.hash(rowType.distributionKeys());
             }
+        };
+
+        MappingService ms = new MappingService() {
+            @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+                return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+            }
+
+            @Override public NodesMapping local() {
+                return new NodesMapping(select(nodes, 0), null, (byte) 0);
+            }
 
             @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
@@ -601,7 +617,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
 
         assertNotNull(ctx);
 
@@ -615,7 +631,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(planner);
 
@@ -665,13 +681,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.projectId = p.id0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        TestRegistry registry = new TestRegistry(){
+        TableDistributionService ds = new TableDistributionService(){
             @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
                 if (cacheId == CU.cacheId("Project"))
                     return IgniteDistributions.broadcast();
 
                 return IgniteDistributions.hash(rowType.distributionKeys());
             }
+        };
+
+        MappingService ms = new MappingService() {
+            @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+                return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+            }
+
+            @Override public NodesMapping local() {
+                return new NodesMapping(select(nodes, 0), null, (byte) 0);
+            }
 
             @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
@@ -689,7 +715,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
 
         assertNotNull(ctx);
 
@@ -703,7 +729,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(planner);
 
@@ -753,10 +779,20 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "ON d.projectId = p.ver0 " +
             "WHERE (d.projectId + 1) > ?";
 
-        TestRegistry registry = new TestRegistry(){
+        TableDistributionService ds = new TableDistributionService(){
             @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
                 return IgniteDistributions.broadcast();
             }
+        };
+
+        MappingService ms = new MappingService() {
+            @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+                return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+            }
+
+            @Override public NodesMapping local() {
+                return new NodesMapping(select(nodes, 0), null, (byte) 0);
+            }
 
             @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
@@ -769,7 +805,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
 
         assertNotNull(ctx);
 
@@ -783,7 +819,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(planner);
 
@@ -834,13 +870,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "WHERE (d.projectId + 1) > ?";
 
 
-        TestRegistry registry = new TestRegistry(){
+        TableDistributionService ds = new TableDistributionService(){
             @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
                 if (cacheId == CU.cacheId("Project"))
                     return IgniteDistributions.broadcast();
 
                 return IgniteDistributions.hash(rowType.distributionKeys());
             }
+        };
+
+        MappingService ms = new MappingService() {
+            @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+                return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+            }
+
+            @Override public NodesMapping local() {
+                return new NodesMapping(select(nodes, 0), null, (byte) 0);
+            }
 
             @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
@@ -858,7 +904,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             }
         };
 
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
 
         assertNotNull(ctx);
 
@@ -872,7 +918,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(planner);
 
@@ -923,13 +969,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             "WHERE (d.projectId + 1) > ?";
 
 
-        TestRegistry registry = new TestRegistry(){
+        TableDistributionService ds = new TableDistributionService() {
             @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
                 if (cacheId == CU.cacheId("Project"))
                     return IgniteDistributions.broadcast();
 
                 return IgniteDistributions.hash(rowType.distributionKeys());
             }
+        };
+
+        MappingService ms = new MappingService() {
+            @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+                return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+            }
+
+            @Override public NodesMapping local() {
+                return new NodesMapping(select(nodes, 0), null, (byte) 0);
+            }
 
             @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
                 if (cacheId == CU.cacheId("Developer"))
@@ -946,7 +1002,8 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
                 throw new AssertionError("Unexpected cache id:" + cacheId);
             }
         };
-        Context ctx = proc.context(Contexts.of(schema, registry, AffinityTopologyVersion.NONE), sql, new Object[]{2});
+
+        PlannerContext ctx = proc.context(Contexts.empty(), sql, new Object[]{2}, (c, q) -> context(c, q, ms, ds));
 
         assertNotNull(ctx);
 
@@ -960,7 +1017,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         try (IgnitePlanner planner = proc.planner(traitDefs, ctx)){
             assertNotNull(planner);
 
-            Query query = ctx.unwrap(Query.class);
+            Query query = Commons.plannerContext(ctx).query();
 
             assertNotNull(planner);
 
@@ -1011,40 +1068,58 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         return res;
     }
 
-    private static class TestRegistry implements LocationRegistry, DistributionRegistry {
-        private AtomicLong idGen = new AtomicLong();
+    private PlannerContext context(Context c, Query q) {
+        MappingService ms = new MappingService() {
+            @Override public NodesMapping random(AffinityTopologyVersion topVer) {
+                return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
+            }
 
-        @Override public NodesMapping random(AffinityTopologyVersion topVer) {
-            return new NodesMapping(select(nodes, 0,1,2,3), null, (byte) 0);
-        }
+            @Override public NodesMapping local() {
+                return new NodesMapping(select(nodes, 0), null, (byte) 0);
+            }
 
-        @Override public NodesMapping local() {
-            return new NodesMapping(select(nodes, 0), null, (byte) 0);
-        }
+            @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
+                if (cacheId == CU.cacheId("Developer"))
+                    return new NodesMapping(null, Arrays.asList(
+                        select(nodes, 0,1),
+                        select(nodes, 1,2),
+                        select(nodes, 2,0),
+                        select(nodes, 0,1),
+                        select(nodes, 1,2)
+                    ), NodesMapping.HAS_PARTITIONED_CACHES);
+                if (cacheId == CU.cacheId("Project"))
+                    return new NodesMapping(null, Arrays.asList(
+                        select(nodes, 0,1),
+                        select(nodes, 1,2),
+                        select(nodes, 2,0),
+                        select(nodes, 0,1),
+                        select(nodes, 1,2)
+                    ), NodesMapping.HAS_PARTITIONED_CACHES);
 
-        @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
-            return IgniteDistributions.hash(rowType.distributionKeys());
-        }
+                throw new AssertionError("Unexpected cache id:" + cacheId);
+            }
+        };
 
-        @Override public NodesMapping distributed(int cacheId, AffinityTopologyVersion topVer) {
-            if (cacheId == CU.cacheId("Developer"))
-                return new NodesMapping(null, Arrays.asList(
-                    select(nodes, 0,1),
-                    select(nodes, 1,2),
-                    select(nodes, 2,0),
-                    select(nodes, 0,1),
-                    select(nodes, 1,2)
-                ), NodesMapping.HAS_PARTITIONED_CACHES);
-            if (cacheId == CU.cacheId("Project"))
-                return new NodesMapping(null, Arrays.asList(
-                    select(nodes, 0,1),
-                    select(nodes, 1,2),
-                    select(nodes, 2,0),
-                    select(nodes, 0,1),
-                    select(nodes, 1,2)
-                ), NodesMapping.HAS_PARTITIONED_CACHES);
-
-            throw new AssertionError("Unexpected cache id:" + cacheId);
-        }
+        TableDistributionService ds = new TableDistributionService() {
+            @Override public DistributionTrait distribution(int cacheId, RowType rowType) {
+                return IgniteDistributions.hash(rowType.distributionKeys());
+            }
+        };
+
+        return context(c, q, ms, ds);
+    }
+
+    private PlannerContext context(Context parent, Query query, MappingService ms, TableDistributionService ds) {
+        return PlannerContext.builder()
+            .parentContext(parent)
+            .logger(log)
+            .kernalContext(kernalContext)
+            .queryProcessor(proc)
+            .query(query)
+            .schema(schema)
+            .topologyVersion(AffinityTopologyVersion.NONE)
+            .distributionService(ds)
+            .mappingService(ms)
+            .build();
     }
 }
\ No newline at end of file
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
index 89efa7c..a08142c 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exchange/OutboxTest.java
@@ -5,7 +5,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.util.ImmutableIntList;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.calcite.exec.Sink;
@@ -38,7 +37,7 @@ public class OutboxTest extends GridCommonAbstractTest {
         NodesMapping mapping = new NodesMapping(Collections.singletonList(nodeId), null, NodesMapping.DEDUPLICATED);
 
         targets = mapping.nodes();
-        func = SingleTargetFactory.INSTANCE.create(Contexts.empty(), mapping, ImmutableIntList.of());
+        func = SingleTargetFactory.INSTANCE.create(null, mapping, ImmutableIntList.of());
     }
 
 
@@ -100,7 +99,7 @@ public class OutboxTest extends GridCommonAbstractTest {
         assertEquals(EndMarker.INSTANCE, F.last(exch.lastBatch));
     }
 
-    private static class TestExchangeService implements ExchangeService {
+    private static class TestExchangeService implements ExchangeProcessor {
         private boolean registered;
         private boolean unregistered;
         private List<Integer> ids = new ArrayList<>();


Mime
View raw message