ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [38/43] ignite git commit: IEP-4 Baseline topology for persistent caches (Phase 1) Contributed by: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com> Dmitry Pavlov <dpavlov.spb@gmail.com> Eduard Shangareev <eduard.shangareev@gmail.com> Ily
Date Wed, 17 Jan 2018 12:23:36 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
index 8eaa003..cf331f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
@@ -17,73 +17,643 @@
 
 package org.apache.ignite.internal.commandline;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import org.apache.ignite.internal.client.GridClient;
+import org.apache.ignite.internal.client.GridClientAuthenticationException;
+import org.apache.ignite.internal.client.GridClientClosedException;
 import org.apache.ignite.internal.client.GridClientClusterState;
+import org.apache.ignite.internal.client.GridClientCompute;
 import org.apache.ignite.internal.client.GridClientConfiguration;
+import org.apache.ignite.internal.client.GridClientDisconnectedException;
 import org.apache.ignite.internal.client.GridClientException;
 import org.apache.ignite.internal.client.GridClientFactory;
+import org.apache.ignite.internal.client.GridClientHandshakeException;
+import org.apache.ignite.internal.client.GridClientNode;
+import org.apache.ignite.internal.client.GridServerUnreachableException;
+import org.apache.ignite.internal.client.impl.connection.GridClientConnectionResetException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.internal.visor.baseline.VisorBaselineNode;
+import org.apache.ignite.internal.visor.baseline.VisorBaselineOperation;
+import org.apache.ignite.internal.visor.baseline.VisorBaselineTask;
+import org.apache.ignite.internal.visor.baseline.VisorBaselineTaskArg;
+import org.apache.ignite.internal.visor.baseline.VisorBaselineTaskResult;
+
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.ADD;
+import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.COLLECT;
+import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.REMOVE;
+import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.SET;
+import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.VERSION;
 
 /**
- *
+ * Class that execute several commands passed via command line.
  */
 public class CommandHandler {
+    /** */
+    private static final String DFLT_HOST = "127.0.0.1";
+
+    /** */
+    private static final String DFLT_PORT = "11211";
+
+    /** */
+    private static final String CMD_HELP = "--help";
+
+    /** */
+    private static final String CMD_HOST = "--host";
+
+    /** */
+    private static final String CMD_PORT = "--port";
+
+    /** */
+    private static final String CMD_ACTIVATE = "--activate";
+
+    /** */
+    private static final String CMD_DEACTIVATE = "--deactivate";
+
+    /** */
+    private static final String CMD_STATE = "--state";
+
+    /** */
+    private static final String CMD_BASE_LINE = "--baseline";
+
+    /** */
+    private static final String BASELINE_ADD = "add";
+
+    /** */
+    private static final String BASELINE_REMOVE = "remove";
+
+    /** */
+    private static final String BASELINE_SET = "set";
+
+    /** */
+    private static final String BASELINE_SET_VERSION = "version";
+
+    /** */
+    private static final String DELIM = "--------------------------------------------------------------------------------";
+
+    /** */
+    public static final int EXIT_CODE_OK = 0;
+
+    /** */
+    public static final int EXIT_CODE_INVALID_ARGUMENTS = 1;
+
+    /** */
+    public static final int EXIT_CODE_CONNECTION_FAILED = 2;
+
+    /** */
+    public static final int ERR_AUTHENTICATION_FAILED = 3;
+
+    /** */
+    public static final int EXIT_CODE_UNEXPECTED_ERROR = 4;
+
+    /**
+     * Output specified string to console.
+     *
+     * @param s String to output.
+     */
+    private void log(String s) {
+        System.out.println(s);
+    }
+
     /**
-     * @param args Args.
+     * Output empty line.
      */
-    public static void main(String[] args) throws GridClientException {
-        String host = "127.0.0.1";
-        String port = "11211";
-        Boolean activate = null;
+    private void nl() {
+        System.out.println("");
+    }
+
+    /**
+     * Print error to console.
+     *
+     * @param errCode Error code to return.
+     * @param s Optional message.
+     * @param e Error to print.
+     */
+    private int error(int errCode, String s, Throwable e) {
+        if (!F.isEmpty(s))
+            log(s);
 
-        if (args.length == 1 && "--help".equals(args[0])){
-            System.out.println("Example: --host {ip} --port {port} --{activate/deactivate} " +
-                "or without command --host {ip} --port {port} then will print status.");
+        String msg = e.getMessage();
 
-            return;
+        if (F.isEmpty(msg))
+            msg = e.getClass().getName();
+
+        if (msg.startsWith("Failed to handle request")) {
+            int p = msg.indexOf("err=");
+
+            msg = msg.substring(p + 4, msg.length() - 1);
         }
 
-        if (args.length > 5)
-            throw new IllegalArgumentException("incorrect number of arguments");
+        log("Error: " + msg);
+
+        return errCode;
+    }
+
+    /**
+     * Print command usage.
+     *
+     * @param desc Command description.
+     * @param cmd Command.
+     */
+    private void usage(String desc, String cmd) {
+        log(desc);
+        log("    control.sh [--host HOST_OR_IP] [--port PORT] " + cmd);
+        nl();
+    }
+
+    /**
+     * Extract next argument.
+     *
+     * @param it Arguments iterator.
+     * @param err Error message.
+     * @return Next argument value.
+     */
+    private String nextArg(Iterator<String> it, String err) {
+        if (it.hasNext()) {
+            String arg = it.next();
+
+            if (arg.startsWith("--"))
+                throw new IllegalArgumentException("Unexpected argument: " + arg);
+
+            return arg;
+        }
+
+        throw new IllegalArgumentException(err);
+    }
+
+    /**
+     * Activate cluster.
+     *
+     * @param client Client.
+     * @throws GridClientException If failed to activate.
+     */
+    private void activate(GridClient client) throws Throwable {
+        try {
+            GridClientClusterState state = client.state();
+
+            state.active(true);
 
-        for (int i = 0; i < args.length; i++) {
-            String str = args[i];
+            log("Cluster activated");
+        }
+        catch (Throwable e) {
+            log("Failed to activate cluster.");
 
-            if ("--host".equals(str))
-                host = args[i + 1];
-            else if ("--port".equals(str))
-                port = args[i + 1];
-            else if ("--activate".equals(str))
-                activate = true;
-            else if ("--deactivate".equals(str))
-                activate = false;
+            throw e;
         }
+    }
 
-        if (host == null)
-            throw new IllegalArgumentException("host can not be empty");
+    /**
+     * Deactivate cluster.
+     *
+     * @param client Client.
+     * @throws Throwable If failed to deactivate.
+     */
+    private void deactivate(GridClient client) throws Throwable {
+        try {
+            GridClientClusterState state = client.state();
 
-        if (port == null)
-            throw new IllegalArgumentException("port can not be empty");
+            state.active(false);
 
-        GridClientConfiguration cfg = new GridClientConfiguration();
-        cfg.setServers(Collections.singletonList(host + ":" + port));
+            log("Cluster deactivated");
+        }
+        catch (Throwable e) {
+            log("Failed to deactivate cluster.");
 
-        try (GridClient client = GridClientFactory.start(cfg)) {
+            throw e;
+        }
+    }
+
+    /**
+     * Print cluster state.
+     *
+     * @param client Client.
+     * @throws Throwable If failed to print state.
+     */
+    private void state(GridClient client) throws Throwable {
+        try {
             GridClientClusterState state = client.state();
 
-            if (activate != null)
+            log("Cluster is " + (state.active() ? "active" : "inactive"));
+        }
+        catch (Throwable e) {
+            log("Failed to get cluster state.");
+
+            throw e;
+        }
+    }
+
+    /**
+     *
+     * @param client Client
+     * @return Task result.
+     * @throws GridClientException If failed to execute task.
+     */
+    private <R> R executeTask(GridClient client, Class<?> taskCls, Object taskArgs) throws GridClientException {
+        GridClientCompute compute = client.compute();
+
+        List<GridClientNode> nodes = new ArrayList<>();
+
+        for (GridClientNode node : compute.nodes())
+            if (node.connectable())
+                nodes.add(node);
+
+        if (F.isEmpty(nodes))
+            throw new GridClientDisconnectedException("Connectable node not found", null);
+
+        GridClientNode node = compute.balancer().balancedNode(nodes);
+
+        return compute.projection(node).execute(taskCls.getName(),
+            new VisorTaskArgument<>(node.nodeId(), taskArgs, false));
+    }
+
+    /**
+     * Change baseline.
+     *
+     * @param client Client.
+     * @param baselineAct Baseline action to execute.  @throws GridClientException If failed to execute baseline action.
+     * @param baselineArgs Baseline action arguments.
+     * @throws Throwable If failed to execute baseline action.
+     */
+    private void baseline(GridClient client, String baselineAct, String baselineArgs) throws Throwable {
+        switch (baselineAct) {
+            case BASELINE_ADD:
+                baselineAdd(client, baselineArgs);
+                break;
+
+            case BASELINE_REMOVE:
+                baselineRemove(client, baselineArgs);
+                break;
+
+            case BASELINE_SET:
+                baselineSet(client, baselineArgs);
+                break;
+
+            case BASELINE_SET_VERSION:
+                baselineVersion(client, baselineArgs);
+                break;
+
+            default:
+                baselinePrint(client);
+        }
+    }
+
+    /**
+     * Prepare task argument.
+     *
+     * @param op Operation.
+     * @param s Argument from command line.
+     * @return Task argument.
+     */
+    private VisorBaselineTaskArg arg(VisorBaselineOperation op, String s) {
+        switch (op) {
+            case ADD:
+            case REMOVE:
+            case SET:
+                if(F.isEmpty(s))
+                    throw new IllegalArgumentException("Empty list of consistent IDs");
+
+                List<String> consistentIds = new ArrayList<>();
+
+                for (String consistentId : s.split(","))
+                    consistentIds.add(consistentId.trim());
+
+                return new VisorBaselineTaskArg(op, -1, consistentIds);
+
+            case VERSION:
                 try {
-                    state.active(activate);
+                    long topVer = Long.parseLong(s);
 
-                    System.out.println(host + ":" + port + " - was " + (activate ? "activate" : "deactivate"));
+                    return new VisorBaselineTaskArg(op, topVer, null);
                 }
-                catch (Exception e) {
-                    System.out.println("Something fail during " + (activate ? "activation" : "deactivation")
-                        + ", exception message: " + e.getMessage());
+                catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid topology version: " + s, e);
                 }
-            else
-                System.out.println(host + ":" + port + " - " + (state.active() ? "active" : "inactive"));
 
+            default:
+                return new VisorBaselineTaskArg(op, -1, null);
         }
     }
+
+    /**
+     * Print baseline topology.
+     *
+     * @param res Task result with baseline topology.
+     */
+    private void baselinePrint0(VisorBaselineTaskResult res) {
+        log("Cluster state: " + (res.isActive() ? "active" : "inactive"));
+        log("Current topology version: " + res.getTopologyVersion());
+        nl();
+
+        Map<String, VisorBaselineNode> baseline = res.getBaseline();
+        Map<String, VisorBaselineNode> servers = res.getServers();
+
+        if (F.isEmpty(baseline))
+            log("Baseline nodes not found.");
+        else {
+            log("Baseline nodes:");
+
+            for(VisorBaselineNode node : baseline.values()) {
+                log("    ConsistentID=" + node.getConsistentId() + ", STATE=" +
+                    (servers.containsKey(node.getConsistentId()) ? "ONLINE" : "OFFLINE"));
+            }
+
+            log(DELIM);
+            log("Number of baseline nodes: " + baseline.size());
+
+            nl();
+
+            List<VisorBaselineNode> others = new ArrayList<>();
+
+            for (VisorBaselineNode node : servers.values()) {
+                if (!baseline.containsKey(node.getConsistentId()))
+                    others.add(node);
+            }
+
+            if (F.isEmpty(others))
+                log("Other nodes not found.");
+            else {
+                log("Other nodes:");
+
+                for(VisorBaselineNode node : others)
+                    log("    ConsistentID=" + node.getConsistentId());
+
+                log("Number of other nodes: " + others.size());
+            }
+        }
+    }
+
+    /**
+     * Print current baseline.
+     *
+     * @param client Client.
+     */
+    private void baselinePrint(GridClient client) throws GridClientException {
+        VisorBaselineTaskResult res = executeTask(client, VisorBaselineTask.class, arg(COLLECT, ""));
+
+        baselinePrint0(res);
+    }
+
+    /**
+     * Add nodes to baseline.
+     *
+     * @param client Client.
+     * @param baselineArgs Baseline action arguments.
+     * @throws Throwable If failed to add nodes to baseline.
+     */
+    private void baselineAdd(GridClient client, String baselineArgs) throws Throwable {
+        try {
+            VisorBaselineTaskResult res = executeTask(client, VisorBaselineTask.class, arg(ADD, baselineArgs));
+
+            baselinePrint0(res);
+        }
+        catch (Throwable e) {
+            log("Failed to add nodes to baseline.");
+
+            throw e;
+        }
+    }
+
+    /**
+     * Remove nodes from baseline.
+     *
+     * @param client Client.
+     * @param consistentIds Consistent IDs.
+     * @throws Throwable If failed to remove nodes from baseline.
+     */
+    private void baselineRemove(GridClient client, String consistentIds) throws Throwable {
+        try {
+            VisorBaselineTaskResult res = executeTask(client, VisorBaselineTask.class, arg(REMOVE, consistentIds));
+
+            baselinePrint0(res);
+        }
+        catch (Throwable e) {
+            log("Failed to remove nodes from baseline.");
+
+            throw e;
+        }
+    }
+
+    /**
+     * Set baseline.
+     *
+     * @param client Client.
+     * @param consistentIds Consistent IDs.
+     * @throws Throwable If failed to set baseline.
+     */
+    private void baselineSet(GridClient client, String consistentIds) throws Throwable {
+        try {
+            VisorBaselineTaskResult res = executeTask(client, VisorBaselineTask.class, arg(SET, consistentIds));
+
+            baselinePrint0(res);
+        }
+        catch (Throwable e) {
+            log("Failed to set baseline.");
+
+            throw e;
+        }
+    }
+
+    /**
+     * Set baseline by topology version.
+     *
+     * @param client Client.
+     * @param arg Argument from command line.
+     */
+    private void baselineVersion(GridClient client, String arg) throws GridClientException {
+        try {
+            VisorBaselineTaskResult res = executeTask(client, VisorBaselineTask.class, arg(VERSION, arg));
+
+            baselinePrint0(res);
+        }
+        catch (Throwable e) {
+            log("Failed to set baseline with specified topology version.");
+
+            throw e;
+        }
+    }
+
+    /**
+     * @param e Exception to check.
+     * @return {@code true} if specified exception is {@link GridClientAuthenticationException}.
+     */
+    private boolean isAuthError(Throwable e) {
+        return X.hasCause(e, GridClientAuthenticationException.class);
+    }
+
+    /**
+     * @param e Exception to check.
+     * @return {@code true} if specified exception is a connection error.
+     */
+    private boolean isConnectionError(Throwable e) {
+        return e instanceof GridClientClosedException ||
+            e instanceof GridClientConnectionResetException ||
+            e instanceof GridClientDisconnectedException ||
+            e instanceof GridClientHandshakeException ||
+            e instanceof GridServerUnreachableException;
+    }
+
+    /**
+     * Parse and execute command.
+     *
+     * @param args Arguments to parse and execute.
+     * @return Exit code.
+     */
+    public int execute(String... args) {
+        log("Control utility [ver. " + ACK_VER_STR + "]");
+        log(COPYRIGHT);
+        log("User: " + System.getProperty("user.name"));
+        log(DELIM);
+
+        try {
+            if (F.isEmpty(args) || (args.length == 1 && CMD_HELP.equalsIgnoreCase(args[0]))){
+                log("This utility can do the following commands:");
+
+                usage("  Activate cluster:", CMD_ACTIVATE);
+                usage("  Deactivate cluster:", CMD_DEACTIVATE);
+                usage("  Print current cluster state:", CMD_STATE);
+                usage("  Print cluster baseline topology:", CMD_BASE_LINE);
+                usage("  Add nodes into baseline topology:", CMD_BASE_LINE + " add consistentId1[,consistentId2,....,consistentIdN]");
+                usage("  Remove nodes from baseline topology:", CMD_BASE_LINE + " remove consistentId1[,consistentId2,....,consistentIdN]");
+                usage("  Set baseline topology:", CMD_BASE_LINE + " set consistentId1[,consistentId2,....,consistentIdN]");
+                usage("  Set baseline topology based on version:", CMD_BASE_LINE + " version topologyVersion");
+
+                log("Default values:");
+                log("    HOST_OR_IP=" + DFLT_HOST);
+                log("    PORT=" + DFLT_PORT);
+                nl();
+
+                log("Exit codes:");
+                log("    " + EXIT_CODE_OK + " - successful execution.");
+                log("    " + EXIT_CODE_INVALID_ARGUMENTS + " - invalid arguments.");
+                log("    " + EXIT_CODE_CONNECTION_FAILED + " - connection failed.");
+                log("    " + ERR_AUTHENTICATION_FAILED + " - authentication failed.");
+                log("    " + EXIT_CODE_UNEXPECTED_ERROR + " - unexpected error.");
+
+                return EXIT_CODE_OK;
+            }
+
+            String host = DFLT_HOST;
+
+            String port = DFLT_PORT;
+
+            String baselineAct = "";
+
+            String baselineArgs = "";
+
+            List<String> commands = new ArrayList<>();
+
+            Iterator<String> it = Arrays.asList(args).iterator();
+
+            while (it.hasNext()) {
+                String str = it.next().toLowerCase();
+
+                switch (str) {
+                    case CMD_HOST:
+                        host = nextArg(it, "Expected host name");
+                        break;
+
+                    case CMD_PORT:
+                        port = nextArg(it, "Expected port number");
+
+                        try {
+                            int p = Integer.parseInt(port);
+
+                            if (p <= 0 || p > 65535)
+                                throw new IllegalArgumentException("Invalid value for port: " + port);
+                        }
+                        catch (NumberFormatException ignored) {
+                            throw new IllegalArgumentException("Invalid value for port: " + port);
+                        }
+                        break;
+
+                    case CMD_ACTIVATE:
+                    case CMD_DEACTIVATE:
+                    case CMD_STATE:
+                        commands.add(str);
+                        break;
+
+                    case CMD_BASE_LINE:
+                        commands.add(CMD_BASE_LINE);
+
+                        if (it.hasNext()) {
+                            baselineAct = it.next().toLowerCase();
+
+                            if (BASELINE_ADD.equals(baselineAct) || BASELINE_REMOVE.equals(baselineAct) ||
+                                BASELINE_SET.equals(baselineAct) || BASELINE_SET_VERSION.equals(baselineAct))
+                                baselineArgs = nextArg(it, "Expected baseline arguments");
+                            else
+                                throw new IllegalArgumentException("Unexpected argument for " + CMD_BASE_LINE + ": "
+                                        + baselineAct);
+                        }
+
+                }
+            }
+
+            int sz = commands.size();
+
+            if (sz < 1)
+                throw new IllegalArgumentException("No action was specified");
+
+            if (sz > 1)
+                throw new IllegalArgumentException("Only one action can be specified, but found: " + sz);
+
+            GridClientConfiguration cfg = new GridClientConfiguration();
+
+            cfg.setServers(Collections.singletonList(host + ":" + port));
+
+            try (GridClient client = GridClientFactory.start(cfg)) {
+                String cmd = commands.get(0);
+
+                switch (cmd) {
+                    case CMD_ACTIVATE:
+                        activate(client);
+                        break;
+
+                    case CMD_DEACTIVATE:
+                        deactivate(client);
+                        break;
+
+                    case CMD_STATE:
+                        state(client);
+                        break;
+
+                    case CMD_BASE_LINE:
+                        baseline(client, baselineAct, baselineArgs);
+                        break;
+                }
+            }
+
+            return 0;
+        }
+        catch (IllegalArgumentException e) {
+            return error(EXIT_CODE_INVALID_ARGUMENTS, "Check arguments.", e);
+        }
+        catch (Throwable e) {
+            if (isAuthError(e))
+                return error(ERR_AUTHENTICATION_FAILED, "Authentication error.", e);
+
+            if (isConnectionError(e))
+                return error(EXIT_CODE_CONNECTION_FAILED, "Connection to cluster failed.", e);
+
+
+            return error(EXIT_CODE_UNEXPECTED_ERROR, "", e);
+        }
+    }
+
+    /**
+     * @param args Arguments to parse and apply.
+     */
+    public static void main(String[] args) {
+        CommandHandler hnd = new CommandHandler();
+
+        System.exit(hnd.execute(args));
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
index a151eb5..74f5a10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
@@ -558,6 +558,20 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
                         return null;
                     }
 
+                    @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, DiscoveryDataBag discoData) {
+                        for (GridComponent comp : ctx) {
+                            if (comp.discoveryDataType() == null)
+                                continue;
+
+                            IgniteNodeValidationResult err = comp.validateNode(node, discoData.newJoinerDiscoveryData(comp.discoveryDataType().ordinal()));
+
+                            if (err != null)
+                                return err;
+                        }
+
+                        return null;
+                    }
+
                     @Override public Collection<SecuritySubject> authenticatedSubjects() {
                         try {
                             return ctx.security().authenticatedSubjects();
@@ -705,6 +719,11 @@ public abstract class GridManagerAdapter<T extends IgniteSpi> implements GridMan
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, DiscoveryDataBag.JoiningNodeDiscoveryData discoData) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public final String toString() {
         return S.toString(GridManagerAdapter.class, this, "name", getClass().getName());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
index c524331..59f773d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java
@@ -19,10 +19,11 @@ package org.apache.ignite.internal.managers.discovery;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
@@ -31,71 +32,104 @@ import org.jetbrains.annotations.Nullable;
  */
 public class ConsistentIdMapper {
     /** Discovery manager. */
-    private final GridDiscoveryManager discoveryManager;
+    private final GridDiscoveryManager discoveryMgr;
 
     /**
      * Create an instance of mapper.
      *
-     * @param discoveryManager Discovery manager.
+     * @param discoveryMgr Discovery manager.
      */
-    public ConsistentIdMapper(GridDiscoveryManager discoveryManager) {
-        this.discoveryManager = discoveryManager;
+    public ConsistentIdMapper(GridDiscoveryManager discoveryMgr) {
+        this.discoveryMgr = discoveryMgr;
     }
 
     /**
-     * Map UUID to consistent id.
+     * Maps UUID to compact ID for given baseline topology.
      *
      * @param topVer Topology version.
      * @param nodeId UUID of node.
-     * @return Consistent id of node.
+     * @return Compact ID of node for given baseline topology.
      */
-    public Object mapToConsistentId(AffinityTopologyVersion topVer, UUID nodeId) {
-        ClusterNode node = discoveryManager.node(topVer, nodeId);
+    public short mapToCompactId(AffinityTopologyVersion topVer, UUID nodeId) {
+        Map<UUID, Short> m = discoveryMgr.consistentId(topVer);
 
-        if (node == null)
-            throw new IllegalStateException("Unable to find node by UUID [nodeId=" + nodeId + ", topVer=" + topVer + ']');
+        if (m == null)
+            throw new IllegalStateException("Unable to find consistent id map [topVer" + topVer + ']');
 
-        return node.consistentId();
+        Short constId = m.get(nodeId);
+
+        if (constId == null)
+            throw new IllegalStateException("Unable to find consistentId by UUID [nodeId=" + nodeId + ", topVer=" + topVer + ']');
+
+        return constId;
     }
 
     /**
-     * Map consistent id to UUID.
+     * Maps UUID to compact ID for given baseline topology.
      *
-     * @param consistentId Consistent id of node.
-     * @return UUID of node.
+     * @param topVer Topology version.
+     * @param nodeConstId UUID of node.
+     * @return Compact ID of node for given baseline topology.
      */
-    @Nullable public UUID mapToUUID(Object consistentId) {
-        for (ClusterNode node : discoveryManager.allNodes())
-            if (node.consistentId().equals(consistentId))
-                return node.id();
+    public UUID mapToUuid(AffinityTopologyVersion topVer, short nodeConstId) {
+        Map<Short, UUID> map = discoveryMgr.nodeIdMap(topVer);
+
+        if (map == null)
+            return null;
+
+        UUID constId = map.get(nodeConstId);
 
-        return null;
+        if (constId == null)
+            throw new IllegalStateException("Unable to find UUID by constId [nodeId=" + nodeConstId + ", topVer=" + topVer + ']');
+
+        return constId;
     }
 
     /**
-     * Map primary -> backup node UUIDs to consistent ids.
+     * Map primary -> backup node compact ID accordingly to baseline topology..
      *
      * @param txNodes Primary -> backup UUID nodes.
-     * @return Primary -> backup consistent id nodes.
+     * @return Primary -> backup compact ID nodes.
      */
-    public Map<Object, Collection<Object>> mapToConsistentIds(AffinityTopologyVersion topVer, @Nullable Map<UUID, Collection<UUID>> txNodes) {
+    public Map<Short, Collection<Short>> mapToCompactIds(
+        AffinityTopologyVersion topVer,
+        @Nullable Map<UUID, Collection<UUID>> txNodes,
+        BaselineTopology baselineTop
+    ) {
         if (txNodes == null)
             return null;
 
-        Map<Object, Collection<Object>> consistentMap = U.newHashMap(txNodes.keySet().size());
+        Map<Object, Short> constIdMap = baselineTop.consistentIdMapping();
+
+        Map<UUID, Short> m = discoveryMgr.consistentId(topVer);
 
-        for (UUID node : txNodes.keySet()) {
-            Collection<UUID> backupNodes = txNodes.get(node);
+        int bltNodes = m.size();
 
-            Collection<Object> consistentIdsBackups = new ArrayList<>(backupNodes.size());
+        Map<Short, Collection<Short>> consistentMap = U.newHashMap(txNodes.size());
 
-            for (UUID backup : backupNodes)
-                consistentIdsBackups.add(mapToConsistentId(topVer, backup));
+        int nodeCnt = 0;
 
-            consistentMap.put(mapToConsistentId(topVer, node), consistentIdsBackups);
+        for (Map.Entry<UUID, Collection<UUID>> e : txNodes.entrySet()) {
+            UUID node = e.getKey();
+
+            Collection<UUID> backupNodes = e.getValue();
+
+            Collection<Short> backups = new ArrayList<>(backupNodes.size());
+
+            for (UUID backup : backupNodes) {
+                if (m.containsKey(backup))
+                    nodeCnt++;
+
+                backups.add(mapToCompactId(topVer, backup));
+            }
+
+            // Optimization for short store full nodes set.
+            if (backups.size() == nodeCnt && nodeCnt == (bltNodes - 1))
+                backups = Collections.singletonList(Short.MAX_VALUE);
+
+            consistentMap.put(mapToCompactId(topVer, node), backups);
         }
 
         return consistentMap;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
index 9ed70aa..aa47168 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java
@@ -23,11 +23,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
 import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -40,6 +42,13 @@ import org.jetbrains.annotations.Nullable;
  */
 public class DiscoCache {
     /** */
+    private static final C1<BaselineNode, ClusterNode> BASELINE_TO_CLUSTER = new C1<BaselineNode, ClusterNode>() {
+        @Override public ClusterNode apply(BaselineNode baselineNode) {
+            return (ClusterNode)baselineNode;
+        }
+    };
+
+    /** */
     private final DiscoveryDataClusterState state;
 
     /** Local node. */
@@ -57,6 +66,9 @@ public class DiscoCache {
     /** Daemon nodes. */
     private final List<ClusterNode> daemonNodes;
 
+    /** Baseline nodes. */
+    private final List<? extends BaselineNode> baselineNodes;
+
     /** All remote nodes with at least one cache configured. */
     @GridToStringInclude
     private final List<ClusterNode> rmtNodesWithCaches;
@@ -81,6 +93,18 @@ public class DiscoCache {
     /** */
     private final AffinityTopologyVersion topVer;
 
+    /** */
+    final Map<UUID, Short> nodeIdToConsIdx;
+
+    /** */
+    final Map<Short, UUID> consIdxToNodeId;
+
+    /** */
+    private final P1<BaselineNode> aliveBaselineNodePred;
+
+    /** */
+    private final P1<ClusterNode> aliveNodePred;
+
     /**
      * @param topVer Topology version.
      * @param state Current cluster state.
@@ -90,10 +114,13 @@ public class DiscoCache {
      * @param srvNodes Server nodes.
      * @param daemonNodes Daemon nodes.
      * @param rmtNodesWithCaches Remote nodes with at least one cache configured.
+     * @param baselineNodes Baseline nodes or {@code null} if baseline was not set.
      * @param allCacheNodes Cache nodes by cache name.
      * @param cacheGrpAffNodes Affinity nodes by cache group ID.
      * @param nodeMap Node map.
-     * @param alives Alive nodes.
+     * @param alives0 Alive nodes.
+     * @param nodeIdToConsIdx Node ID to consistent ID mapping.
+     * @param consIdxToNodeId Consistent ID to node ID mapping.
      * @param minNodeVer Minimum node version.
      */
     DiscoCache(
@@ -105,11 +132,15 @@ public class DiscoCache {
         List<ClusterNode> srvNodes,
         List<ClusterNode> daemonNodes,
         List<ClusterNode> rmtNodesWithCaches,
+        @Nullable List<? extends BaselineNode> baselineNodes,
         Map<Integer, List<ClusterNode>> allCacheNodes,
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes,
         Map<UUID, ClusterNode> nodeMap,
-        Set<UUID> alives,
-        IgniteProductVersion minNodeVer) {
+        Set<UUID> alives0,
+        @Nullable Map<UUID, Short> nodeIdToConsIdx,
+        @Nullable  Map<Short, UUID> consIdxToNodeId,
+        IgniteProductVersion minNodeVer
+    ) {
         this.topVer = topVer;
         this.state = state;
         this.loc = loc;
@@ -118,11 +149,28 @@ public class DiscoCache {
         this.srvNodes = srvNodes;
         this.daemonNodes = daemonNodes;
         this.rmtNodesWithCaches = rmtNodesWithCaches;
+        this.baselineNodes = baselineNodes;
         this.allCacheNodes = allCacheNodes;
         this.cacheGrpAffNodes = cacheGrpAffNodes;
         this.nodeMap = nodeMap;
-        this.alives.addAll(alives);
+        alives.addAll(alives0);
         this.minNodeVer = minNodeVer;
+        this.nodeIdToConsIdx = nodeIdToConsIdx;
+        this.consIdxToNodeId = consIdxToNodeId;
+
+        aliveBaselineNodePred = new P1<BaselineNode>() {
+            @Override
+            public boolean apply(BaselineNode node) {
+                return node instanceof ClusterNode && alives.contains(((ClusterNode)node).id());
+
+            }
+        };
+
+        aliveNodePred = new P1<ClusterNode>() {
+            @Override public boolean apply(ClusterNode node) {
+                return alives.contains(node.id());
+            }
+        };
     }
 
     /**
@@ -156,6 +204,15 @@ public class DiscoCache {
         return rmtNodes;
     }
 
+    /**
+     * Returns a collection of baseline nodes.
+     *
+     * @return A collection of baseline nodes or {@code null} if baseline topology was not set.
+     */
+    @Nullable public List<? extends BaselineNode> baselineNodes() {
+        return baselineNodes;
+    }
+
     /** @return All nodes. */
     public List<ClusterNode> allNodes() {
         return allNodes;
@@ -171,17 +228,23 @@ public class DiscoCache {
         return daemonNodes;
     }
 
+    /** @return Consistent id map UUID -> Short (compacted consistent id). */
+    public Map<UUID, Short> consistentIdMap() {
+        return nodeIdToConsIdx;
+    }
+
+    /** @return Consistent id map Short (compacted consistent id) -> UUID. */
+    public Map<Short, UUID> nodeIdMap() {
+        return consIdxToNodeId;
+    }
+
     /**
      * Gets all alive remote nodes that have at least one cache configured.
      *
      * @return Collection of nodes.
      */
     public Collection<ClusterNode> remoteAliveNodesWithCaches() {
-        return F.view(rmtNodesWithCaches, new P1<ClusterNode>() {
-            @Override public boolean apply(ClusterNode node) {
-                return alives.contains(node.id());
-            }
-        });
+        return F.view(rmtNodesWithCaches, aliveNodePred);
     }
 
     /**
@@ -190,17 +253,26 @@ public class DiscoCache {
      * @return Collection of nodes.
      */
     public Collection<ClusterNode> aliveServerNodes() {
-        return F.view(serverNodes(), new P1<ClusterNode>() {
-            @Override public boolean apply(ClusterNode node) {
-                return alives.contains(node.id());
-            }
-        });
+        return F.view(serverNodes(), aliveNodePred);
+    }
+
+    /**
+     * Returns a collection of live baseline nodes.
+     *
+     * @return A view of baseline nodes that are currently present in the cluster or {@code null} if baseline
+     *      topology was not set.
+     */
+    @Nullable public Collection<ClusterNode> aliveBaselineNodes() {
+        return baselineNodes == null ? null : F.viewReadOnly(baselineNodes, BASELINE_TO_CLUSTER, aliveBaselineNodePred);
+
     }
 
     /**
      * @return Oldest alive server node.
      */
-    public @Nullable ClusterNode oldestAliveServerNode(){
+    @SuppressWarnings("ForLoopReplaceableByForEach")
+    @Nullable public ClusterNode oldestAliveServerNode(){
+        // Avoid iterator allocation.
         for (int i = 0; i < srvNodes.size(); i++) {
             ClusterNode srv = srvNodes.get(i);
 
@@ -330,10 +402,13 @@ public class DiscoCache {
             srvNodes,
             daemonNodes,
             rmtNodesWithCaches,
+            baselineNodes,
             allCacheNodes,
             cacheGrpAffNodes,
             nodeMap,
             alives,
+            nodeIdToConsIdx,
+            consIdxToNodeId,
             minNodeVer);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 088c9ce..65cc666 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -50,6 +50,7 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.BaselineNode;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -59,10 +60,10 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.GridNodeOrderComparator;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.cluster.NodeOrderComparator;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -74,12 +75,12 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
 import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
-import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.cluster.IGridClusterStateProcessor;
 import org.apache.ignite.internal.processors.security.SecurityContext;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridAtomicLong;
@@ -705,11 +706,19 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     Snapshot snapshot = topSnap.get();
 
                     if (customMsg == null) {
-                        discoCache = createDiscoCache(nextTopVer,
+                        discoCache = createDiscoCache(
+                            nextTopVer,
                             ctx.state().clusterState(),
                             locNode,
                             topSnapshot);
                     }
+                    else if (customMsg instanceof ChangeGlobalStateMessage) {
+                        discoCache = createDiscoCache(
+                            nextTopVer,
+                            ctx.state().pendingState((ChangeGlobalStateMessage)customMsg),
+                            locNode,
+                            topSnapshot);
+                    }
                     else
                         discoCache = customMsg.createDiscoCache(GridDiscoveryManager.this, nextTopVer, snapshot.discoCache);
 
@@ -784,7 +793,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                     topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO,
                         createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode,
-                            Collections.<ClusterNode>singleton(locNode))
+                            Collections.singleton(locNode))
                     ));
                 }
                 else if (type == EVT_CLIENT_NODE_RECONNECTED) {
@@ -846,7 +855,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
                 if (ctx.localNodeId().equals(dataBag.joiningNodeId())) {
                     // NodeAdded msg reached joining node after round-trip over the ring.
-                    GridClusterStateProcessor stateProc = ctx.state();
+                    IGridClusterStateProcessor stateProc = ctx.state();
 
                     stateProc.onGridDataReceived(dataBag.gridDiscoveryData(
                         stateProc.discoveryDataType().ordinal()));
@@ -858,7 +867,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 }
                 else {
                     // Discovery data from newly joined node has to be applied to the current old node.
-                    GridClusterStateProcessor stateProc = ctx.state();
+                    IGridClusterStateProcessor stateProc = ctx.state();
 
                     JoiningNodeDiscoveryData data0 = dataBag.newJoinerDiscoveryData(
                         stateProc.discoveryDataType().ordinal());
@@ -1810,6 +1819,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param topVer Topology version.
+     * @return All baseline nodes for given topology version or {@code null} if baseline was not set for the
+     *      given topology version.
+     */
+    @Nullable public List<? extends BaselineNode> baselineNodes(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(CU.cacheId(null), topVer).baselineNodes();
+    }
+
+    /**
      * Gets node from history for given topology version.
      *
      * @param topVer Topology version.
@@ -1821,6 +1839,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * Gets consistentId from history for given topology version.
+     *
+     * @param topVer Topology version.
+     * @return Compacted consistent id.
+     */
+    public Map<UUID, Short> consistentId(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(CU.cacheId(null), topVer).consistentIdMap();
+    }
+
+    /**
+     * Gets consistentId from history for given topology version.
+     *
+     * @param topVer Topology version.
+     * @return Compacted consistent id map.
+     */
+    public  Map<Short, UUID> nodeIdMap(AffinityTopologyVersion topVer) {
+        return resolveDiscoCache(CU.cacheId(null), topVer).nodeIdMap();
+    }
+
+    /**
      * Gets cache nodes for cache with given name.
      *
      * @param cacheName Cache name.
@@ -2004,7 +2042,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
     /**
      * @return Consistent ID.
-     * @deprecated Use PdsConsistentIdProcessor to get actual consistent ID
+     * @deprecated Use {@link ClusterNode#consistentId()} of local node to get actual consistent ID.
      */
     @Deprecated
     public Serializable consistentId() {
@@ -2206,6 +2244,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         ArrayList<ClusterNode> rmtNodes = new ArrayList<>(topSnapshot.size());
         ArrayList<ClusterNode> allNodes = new ArrayList<>(topSnapshot.size());
 
+        Map<UUID, Short> nodeIdToConsIdx;
+        Map<Short, UUID> consIdxToNodeId;
+        List<? extends BaselineNode> baselineNodes;
+
         IgniteProductVersion minVer = null;
 
         for (ClusterNode node : topSnapshot) {
@@ -2237,10 +2279,52 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
 
         Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size());
-        Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+        Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(NodeOrderComparator.getInstance());
 
         fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches);
 
+        BaselineTopology blt = state.baselineTopology();
+
+        if (blt != null) {
+            nodeIdToConsIdx = U.newHashMap(srvNodes.size());
+            consIdxToNodeId = U.newHashMap(srvNodes.size());
+
+            Map<Object, Short> m = blt.consistentIdMapping();
+
+            Map<Object, ClusterNode> aliveNodesByConsId = U.newHashMap(srvNodes.size());
+
+            for (ClusterNode node : srvNodes) {
+                Short compactedId = m.get(node.consistentId());
+
+                if (compactedId != null) {
+                    nodeIdToConsIdx.put(node.id(), compactedId);
+
+                    consIdxToNodeId.put(compactedId, node.id());
+                }
+
+                aliveNodesByConsId.put(node.consistentId(), node);
+            }
+
+            List<BaselineNode >baselineNodes0 = new ArrayList<>(blt.size());
+
+            for (Object consId : blt.consistentIds()) {
+                ClusterNode srvNode = aliveNodesByConsId.get(consId);
+
+                if (srvNode != null)
+                    baselineNodes0.add(srvNode);
+                else
+                    baselineNodes0.add(blt.baselineNode(consId));
+            }
+
+            baselineNodes = baselineNodes0;
+        }
+        else {
+            nodeIdToConsIdx = null;
+            consIdxToNodeId = null;
+
+            baselineNodes = null;
+        }
+
         return new DiscoCache(
             topVer,
             state,
@@ -2250,10 +2334,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             Collections.unmodifiableList(srvNodes),
             Collections.unmodifiableList(daemonNodes),
             U.sealList(rmtNodesWithCaches),
+            baselineNodes == null ? null : Collections.unmodifiableList(baselineNodes),
             Collections.unmodifiableMap(allCacheNodes),
             Collections.unmodifiableMap(cacheGrpAffNodes),
             Collections.unmodifiableMap(nodeMap),
             alives,
+            nodeIdToConsIdx == null ? null : Collections.unmodifiableMap(nodeIdToConsIdx),
+            consIdxToNodeId == null ? null : Collections.unmodifiableMap(consIdxToNodeId),
             minVer);
     }
 
@@ -2364,8 +2451,12 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                         discoWrk.addEvent(EVT_NODE_SEGMENTED,
                             AffinityTopologyVersion.NONE,
                             node,
-                            createDiscoCache(AffinityTopologyVersion.NONE, null, node, locNodeOnlyTop),
-                            locNodeOnlyTop,
+                            createDiscoCache(
+                                AffinityTopologyVersion.NONE,
+                                ctx.state().clusterState(),
+                                node,
+                                locNodeOnlyTop
+                            ), locNodeOnlyTop,
                             null);
 
                         lastSegChkRes.set(false);
@@ -3056,12 +3147,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
      * @param discoCache Current disco cache.
      * @return New discovery cache.
      */
-    public DiscoCache createDiscoCacheOnCacheChange(AffinityTopologyVersion topVer,
-        DiscoCache discoCache) {
+    public DiscoCache createDiscoCacheOnCacheChange(
+        AffinityTopologyVersion topVer,
+        DiscoCache discoCache
+    ) {
         List<ClusterNode> allNodes = discoCache.allNodes();
         Map<Integer, List<ClusterNode>> allCacheNodes = U.newHashMap(allNodes.size());
         Map<Integer, List<ClusterNode>> cacheGrpAffNodes = U.newHashMap(allNodes.size());
-        Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE);
+        Set<ClusterNode> rmtNodesWithCaches = new TreeSet<>(NodeOrderComparator.getInstance());
 
         fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches);
 
@@ -3074,10 +3167,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             discoCache.serverNodes(),
             discoCache.daemonNodes(),
             U.sealList(rmtNodesWithCaches),
+            discoCache.baselineNodes(),
             allCacheNodes,
             cacheGrpAffNodes,
             discoCache.nodeMap,
             discoCache.alives,
+            discoCache.nodeIdToConsIdx,
+            discoCache.consIdxToNodeId,
             discoCache.minimumNodeVersion());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/mem/unsafe/UnsafeMemoryProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/mem/unsafe/UnsafeMemoryProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/mem/unsafe/UnsafeMemoryProvider.java
index bf6e807..276e10e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/mem/unsafe/UnsafeMemoryProvider.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/mem/unsafe/UnsafeMemoryProvider.java
@@ -57,13 +57,15 @@ public class UnsafeMemoryProvider implements DirectMemoryProvider {
 
     /** {@inheritDoc} */
     @Override public void shutdown() {
-        for (Iterator<DirectMemoryRegion> it = regions.iterator(); it.hasNext(); ) {
-            DirectMemoryRegion chunk = it.next();
+        if (regions != null) {
+            for (Iterator<DirectMemoryRegion> it = regions.iterator(); it.hasNext(); ) {
+                DirectMemoryRegion chunk = it.next();
 
-            GridUnsafe.freeMemory(chunk.address());
+                GridUnsafe.freeMemory(chunk.address());
 
-            // Safety.
-            it.remove();
+                // Safety.
+                it.remove();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
index 64c5927..6802a3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java
@@ -52,6 +52,11 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh
         throws IgniteCheckedException;
 
     /**
+     * Initializes disk cache store structures.
+     */
+    public void initializeForMetastorage() throws IgniteCheckedException;
+
+    /**
      * Callback called when a cache is stopping. After this callback is invoked, no data associated with
      * the given cache will be stored on disk.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index 42d9611..19b47e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -96,10 +96,11 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni
      * the given pointer will be kept because there is a configurable WAL history size. Those entries may be used
      * for partial partition rebalancing.
      *
-     * @param ptr Pointer for which it is safe to clear the log.
+     * @param low Pointer since which WAL will be truncated. If null, WAL will be truncated from the oldest segment.
+     * @param high Pointer for which it is safe to clear the log.
      * @return Number of deleted WAL segments.
      */
-    public int truncate(WALPointer ptr);
+    public int truncate(WALPointer low, WALPointer high);
 
     /**
      * Gives a hint to WAL manager to compact WAL until given pointer (exclusively).

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java
new file mode 100644
index 0000000..48b60b3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Record for storing baseline topology compact node ID to consistent node ID mapping.
+ */
+public class BaselineTopologyRecord extends WALRecord {
+    /** Id. */
+    private int id;
+
+    /** Compact ID to consistent ID mapping. */
+    private Map<Short, Object> mapping;
+
+    /**
+     * Default constructor.
+     */
+    private BaselineTopologyRecord() {
+        // No-op, used from factory methods.
+    }
+
+    /**
+     * @param id Baseline topology ID.
+     * @param mapping Compact ID to consistent ID mapping.
+     */
+    public BaselineTopologyRecord(int id, Map<Short, Object> mapping) {
+        this.id = id;
+        this.mapping = mapping;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.BASELINE_TOP_RECORD;
+    }
+
+    /**
+     * Returns baseline topology ID.
+     *
+     * @return Baseline topology ID.
+     */
+    public int id() {
+        return id;
+    }
+
+    /**
+     * Returns mapping.
+     *
+     * @return Compact ID to consistent ID mapping.
+     */
+    public Map<Short, Object> mapping() {
+        return mapping;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(BaselineTopologyRecord.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CacheState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CacheState.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CacheState.java
index 41d38d0..49025c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CacheState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CacheState.java
@@ -30,6 +30,9 @@ public class CacheState {
     private long[] vals;
 
     /** */
+    private byte[] states;
+
+    /** */
     private int idx;
 
     /**
@@ -38,6 +41,7 @@ public class CacheState {
     public CacheState(int partsCnt) {
         parts = new int[partsCnt];
         vals = new long[partsCnt * 2];
+        states = new byte[partsCnt];
     }
 
     /**
@@ -46,6 +50,16 @@ public class CacheState {
      * @param cntr Partition counter.
      */
     public void addPartitionState(int partId, long size, long cntr) {
+        addPartitionState(partId, size, cntr, (byte)-1);
+    }
+
+    /**
+     * @param partId Partition ID to add.
+     * @param size Partition size.
+     * @param cntr Partition counter.
+     * @param state Partition state.
+     */
+    public void addPartitionState(int partId, long size, long cntr, byte state) {
         if (idx == parts.length)
             throw new IllegalStateException("Failed to add new partition to the partitions state " +
                 "(no enough space reserved) [partId=" + partId + ", reserved=" + parts.length + ']');
@@ -57,6 +71,8 @@ public class CacheState {
         }
 
         parts[idx] = partId;
+        states[idx] = state;
+
         vals[2 * idx] = size;
         vals[2 * idx + 1] = cntr;
 
@@ -97,6 +113,14 @@ public class CacheState {
 
     /**
      * @param idx Index to get.
+     * @return State partition.
+     */
+    public byte stateByIndex(int idx) {
+        return states[idx];
+    }
+
+    /**
+     * @param idx Index to get.
      * @return Partition size by index.
      */
     public long partitionSizeByIndex(int idx) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
index cb6b482..3511aff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java
@@ -97,8 +97,8 @@ public class DataEntry {
         this.partId = partId;
         this.partCnt = partCnt;
 
-        // Only CREATE, UPDATE and DELETE operations should be stored in WAL.
-        assert op == GridCacheOperation.CREATE || op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE : op;
+        // Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL.
+        assert op == GridCacheOperation.READ || op == GridCacheOperation.CREATE || op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE : op;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
index ac569bd..7a4d6b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java
@@ -21,6 +21,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Logical data record with cache operation description.
@@ -48,19 +49,18 @@ public class DataRecord extends TimeStampRecord {
      * @param writeEntry Write entry.
      */
     public DataRecord(DataEntry writeEntry) {
-        this(Collections.singletonList(writeEntry));
+        this(writeEntry, U.currentTimeMillis());
     }
 
     /**
      * @param writeEntries Write entries.
      */
     public DataRecord(List<DataEntry> writeEntries) {
-        this.writeEntries = writeEntries;
+        this(writeEntries, U.currentTimeMillis());
     }
 
     /**
      * @param writeEntry Write entry.
-     * @param timestamp TimeStamp.
      */
     public DataRecord(DataEntry writeEntry, long timestamp) {
         this(Collections.singletonList(writeEntry), timestamp);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ExchangeRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ExchangeRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ExchangeRecord.java
new file mode 100644
index 0000000..87bc4e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ExchangeRecord.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Partition map exchange WAL record.
+ */
+public class ExchangeRecord extends TimeStampRecord {
+    /** Event. */
+    private Short constId;
+
+    /** Type. */
+    private Type type;
+
+    /**
+     * @param constId Const id.
+     * @param type Type.
+     * @param timeStamp TimeStamp.
+     */
+    public ExchangeRecord(Short constId, Type type, long timeStamp) {
+        super(timeStamp);
+
+        this.constId = constId;
+        this.type = type;
+    }
+
+    /**
+     * @param constId Const id.
+     * @param type Type.
+     */
+    public ExchangeRecord(Short constId, Type type) {
+        this.constId = constId;
+        this.type = type;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.EXCHANGE;
+    }
+
+    /**
+     *
+     */
+    public Short getConstId() {
+        return constId;
+    }
+
+    /**
+     *
+     */
+    public Type getType() {
+        return type;
+    }
+
+    /**
+     *
+     */
+    public enum Type {
+        /** Join. */
+        JOIN,
+        /** Left. */
+        LEFT
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(ExchangeRecord.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
index 519e825..e077e5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java
@@ -24,6 +24,9 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRe
  * {@link AbstractWalRecordsIterator}.
  */
 public class FilteredRecord extends WALRecord {
+    /** Instance. */
+    public static final FilteredRecord INSTANCE = new FilteredRecord();
+
     /** {@inheritDoc} */
     @Override public RecordType type() {
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java
new file mode 100644
index 0000000..e269de2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class MetastoreDataRecord extends WALRecord {
+    /** */
+    private final String key;
+
+    /** */
+    @Nullable private final byte[] value;
+
+    /**
+     * @param key Key.
+     * @param value Value.
+     */
+    public MetastoreDataRecord(String key, @Nullable byte[] value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    /** */
+    public String key() {
+        return key;
+    }
+
+    /** */
+    @Nullable public byte[] value() {
+        return value;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.METASTORE_DATA_RECORD;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(MetastoreDataRecord.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
index 6ee96a4..6740221 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import sun.nio.ch.DirectBuffer;
 
 /**
  *
@@ -92,9 +93,12 @@ public class PageSnapshot extends WALRecord {
                 + "],\nsuper = ["
                 + super.toString() + "]]";
         }
-        catch (IgniteCheckedException e) {
+        catch (IgniteCheckedException ignored) {
             return "Error during call'toString' of PageSnapshot [fullPageId=" + fullPageId() +
                 ", pageData = " + Arrays.toString(pageData) + ", super=" + super.toString() + "]";
         }
+        finally {
+            ((DirectBuffer)buf).cleaner().clean();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java
index 3c3a77b..c6b6329 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.pagemem.wal.record;
 
+import org.apache.ignite.internal.util.typedef.internal.S;
+
 /**
  * Wal snapshot record.
  */
@@ -49,10 +51,20 @@ public class SnapshotRecord extends WALRecord {
         return full;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean rollOver() {
+        return true;
+    }
+
     /**
      *
      */
     @Override public RecordType type() {
         return RecordType.SNAPSHOT;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(SnapshotRecord.class, this, "super", super.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
index 3f29dfd..c1b8584 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.pagemem.wal.record;
 
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
@@ -25,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  */
 public abstract class TimeStampRecord extends WALRecord {
     /** Timestamp. */
+    @GridToStringInclude
     protected long timestamp;
 
     /**
@@ -54,4 +57,9 @@ public abstract class TimeStampRecord extends WALRecord {
     public long timestamp() {
         return timestamp;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(TimeStampRecord.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
index f933fa9..90df004 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.pagemem.wal.record;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
@@ -30,9 +31,11 @@ import org.jetbrains.annotations.Nullable;
  */
 public class TxRecord extends TimeStampRecord {
     /** Transaction state. */
+    @GridToStringInclude
     private TransactionState state;
 
     /** Global transaction identifier within cluster, assigned by transaction coordinator. */
+    @GridToStringInclude
     private GridCacheVersion nearXidVer;
 
     /** Transaction entries write topology version. */
@@ -42,33 +45,27 @@ public class TxRecord extends TimeStampRecord {
      * Transaction participating nodes.
      *
      * Structure:
-     * Primary node -> [Backup nodes...]
+     * Primary node -> [Backup nodes...], where nodes are identified by compact ID for some baseline topology.
      **/
-    @Nullable private Map<Object, Collection<Object>> participatingNodes;
-
-    /** If transaction is remote, primary node for this backup node. */
-    @Nullable private Object primaryNode;
+    @Nullable private Map<Short, Collection<Short>> participatingNodes;
 
     /**
      *
      * @param state Transaction state.
      * @param nearXidVer Transaction id.
      * @param writeVer Transaction entries write topology version.
-     * @param participatingNodes Primary -> Backup nodes participating in transaction.
-     * @param primaryNode Primary node.
+     * @param participatingNodes Primary -> Backup nodes compact IDs participating in transaction.
      */
     public TxRecord(
         TransactionState state,
         GridCacheVersion nearXidVer,
         GridCacheVersion writeVer,
-        @Nullable Map<Object, Collection<Object>> participatingNodes,
-        @Nullable Object primaryNode
+        @Nullable Map<Short, Collection<Short>> participatingNodes
     ) {
         this.state = state;
         this.nearXidVer = nearXidVer;
         this.writeVer = writeVer;
         this.participatingNodes = participatingNodes;
-        this.primaryNode = primaryNode;
     }
 
     /**
@@ -76,24 +73,21 @@ public class TxRecord extends TimeStampRecord {
      * @param nearXidVer Transaction id.
      * @param writeVer Transaction entries write topology version.
      * @param participatingNodes Primary -> Backup nodes participating in transaction.
-     * @param primaryNode Primary node.
-     * @param timestamp TimeStamp.
+     * @param ts TimeStamp.
      */
     public TxRecord(
         TransactionState state,
         GridCacheVersion nearXidVer,
         GridCacheVersion writeVer,
-        @Nullable Map<Object, Collection<Object>> participatingNodes,
-        @Nullable Object primaryNode,
-        long timestamp
+        @Nullable Map<Short, Collection<Short>> participatingNodes,
+        long ts
     ) {
-        super(timestamp);
+        super(ts);
 
         this.state = state;
         this.nearXidVer = nearXidVer;
         this.writeVer = writeVer;
         this.participatingNodes = participatingNodes;
-        this.primaryNode = primaryNode;
     }
 
     /** {@inheritDoc} */
@@ -144,33 +138,19 @@ public class TxRecord extends TimeStampRecord {
     }
 
     /**
-     * @return Primary -> backup participating nodes.
+     * @return Primary -> backup participating nodes compact IDs.
      */
-    public Map<Object, Collection<Object>> participatingNodes() {
+    public Map<Short, Collection<Short>> participatingNodes() {
         return participatingNodes;
     }
 
     /**
-     * @param participatingNodeIds Primary -> backup participating nodes.
+     * @param participatingNodeIds Primary -> backup participating nodes compact IDs.
      */
-    public void participatingNodes(Map<Object, Collection<Object>> participatingNodeIds) {
+    public void participatingNodes(Map<Short, Collection<Short>> participatingNodeIds) {
         this.participatingNodes = participatingNodeIds;
     }
 
-    /**
-     * @return Is transaction remote for backup.
-     */
-    public boolean remote() {
-        return primaryNode != null;
-    }
-
-    /**
-     * @return Primary node for backup if transaction is remote.
-     */
-    @Nullable public Object primaryNode() {
-        return primaryNode;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(TxRecord.class, this, "super", super.toString());

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 7446916..8362a69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -166,7 +166,16 @@ public abstract class WALRecord {
         PARTITION_DESTROY,
 
         /** Snapshot record. */
-        SNAPSHOT;
+        SNAPSHOT,
+
+        /** Metastore data record. */
+        METASTORE_DATA_RECORD,
+
+        /** Exchange record. */
+        EXCHANGE,
+
+        /** Baseline topology record. */
+        BASELINE_TOP_RECORD;
 
         /** */
         private static final RecordType[] VALS = RecordType.values();
@@ -241,6 +250,13 @@ public abstract class WALRecord {
     }
 
     /**
+     * @return Need wal rollOver.
+     */
+    public boolean rollOver(){
+        return false;
+    }
+
+    /**
      * @return Entry type.
      */
     public abstract RecordType type();

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java
index e07c388..5324d56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java
@@ -19,7 +19,8 @@ package org.apache.ignite.internal.pagemem.wal.record.delta;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
@@ -54,7 +55,7 @@ public class DataPageInsertFragmentRecord extends PageDeltaRecord {
 
     /** {@inheritDoc} */
     @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
-        DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
+        AbstractDataPageIO io = PageIO.getPageIO(pageAddr);
 
         io.addRowFragment(pageAddr, payload, lastLink, pageMem.pageSize());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java
index f315058..2c9a8e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java
@@ -19,7 +19,8 @@ package org.apache.ignite.internal.pagemem.wal.record.delta;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -55,7 +56,7 @@ public class DataPageInsertRecord extends PageDeltaRecord {
     @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
         assert payload != null;
 
-        DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
+        AbstractDataPageIO io = PageIO.getPageIO(pageAddr);
 
         io.addRow(pageAddr, payload, pageMem.pageSize());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageRemoveRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageRemoveRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageRemoveRecord.java
index 484ec87..f7776be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageRemoveRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageRemoveRecord.java
@@ -19,7 +19,8 @@ package org.apache.ignite.internal.pagemem.wal.record.delta;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -50,7 +51,7 @@ public class DataPageRemoveRecord extends PageDeltaRecord {
     /** {@inheritDoc} */
     @Override public void applyDelta(PageMemory pageMem, long pageAddr)
         throws IgniteCheckedException {
-        DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
+        AbstractDataPageIO io = PageIO.getPageIO(pageAddr);
 
         io.removeRow(pageAddr, itemId, pageMem.pageSize());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageSetFreeListPageRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageSetFreeListPageRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageSetFreeListPageRecord.java
index 0ade484..e679611 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageSetFreeListPageRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageSetFreeListPageRecord.java
@@ -19,7 +19,8 @@ package org.apache.ignite.internal.pagemem.wal.record.delta;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.PageMemory;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_PAGE_SET_FREE_LIST_PAGE;
@@ -51,7 +52,7 @@ public class DataPageSetFreeListPageRecord extends PageDeltaRecord {
 
     /** {@inheritDoc} */
     @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
-        DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
+        AbstractDataPageIO io = PageIO.getPageIO(pageAddr);
 
         io.setFreeListPageId(pageAddr, freeListPage);
     }


Mime
View raw message