From commits-return-116458-archive-asf-public=cust-asf.ponee.io@ignite.apache.org Wed Jan 17 13:23:07 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 66A7B180670 for ; Wed, 17 Jan 2018 13:23:06 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 56FA0160C60; Wed, 17 Jan 2018 12:23:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 07CAB160C4A for ; Wed, 17 Jan 2018 13:23:03 +0100 (CET) Received: (qmail 96307 invoked by uid 500); 17 Jan 2018 12:23:03 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 96190 invoked by uid 99); 17 Jan 2018 12:23:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Jan 2018 12:23:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 42341F3526; Wed, 17 Jan 2018 12:23:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: akuznetsov@apache.org To: commits@ignite.apache.org Date: Wed, 17 Jan 2018 12:23:36 -0000 Message-Id: <38bcf666373a4ea5ae662bc1ebc584bb@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [38/43] ignite git commit: IEP-4 Baseline topology for persistent caches (Phase 1) Contributed by: Dmitriy Govorukhin Dmitry Pavlov Eduard Shangareev Ily http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java index 8eaa003..cf331f8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java @@ -17,73 +17,643 @@ package org.apache.ignite.internal.commandline; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import org.apache.ignite.internal.client.GridClient; +import org.apache.ignite.internal.client.GridClientAuthenticationException; +import org.apache.ignite.internal.client.GridClientClosedException; import org.apache.ignite.internal.client.GridClientClusterState; +import org.apache.ignite.internal.client.GridClientCompute; import org.apache.ignite.internal.client.GridClientConfiguration; +import org.apache.ignite.internal.client.GridClientDisconnectedException; import org.apache.ignite.internal.client.GridClientException; import org.apache.ignite.internal.client.GridClientFactory; +import org.apache.ignite.internal.client.GridClientHandshakeException; +import org.apache.ignite.internal.client.GridClientNode; +import org.apache.ignite.internal.client.GridServerUnreachableException; +import org.apache.ignite.internal.client.impl.connection.GridClientConnectionResetException; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.internal.visor.baseline.VisorBaselineNode; +import org.apache.ignite.internal.visor.baseline.VisorBaselineOperation; +import org.apache.ignite.internal.visor.baseline.VisorBaselineTask; +import org.apache.ignite.internal.visor.baseline.VisorBaselineTaskArg; +import org.apache.ignite.internal.visor.baseline.VisorBaselineTaskResult; + +import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR; +import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT; +import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.ADD; +import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.COLLECT; +import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.REMOVE; +import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.SET; +import static org.apache.ignite.internal.visor.baseline.VisorBaselineOperation.VERSION; /** - * + * Class that execute several commands passed via command line. */ public class CommandHandler { + /** */ + private static final String DFLT_HOST = "127.0.0.1"; + + /** */ + private static final String DFLT_PORT = "11211"; + + /** */ + private static final String CMD_HELP = "--help"; + + /** */ + private static final String CMD_HOST = "--host"; + + /** */ + private static final String CMD_PORT = "--port"; + + /** */ + private static final String CMD_ACTIVATE = "--activate"; + + /** */ + private static final String CMD_DEACTIVATE = "--deactivate"; + + /** */ + private static final String CMD_STATE = "--state"; + + /** */ + private static final String CMD_BASE_LINE = "--baseline"; + + /** */ + private static final String BASELINE_ADD = "add"; + + /** */ + private static final String BASELINE_REMOVE = "remove"; + + /** */ + private static final String BASELINE_SET = "set"; + + /** */ + private static final String BASELINE_SET_VERSION = "version"; + + /** */ + private static final String DELIM = "--------------------------------------------------------------------------------"; + + /** */ + public static final int EXIT_CODE_OK = 0; + + /** */ + public static final int EXIT_CODE_INVALID_ARGUMENTS = 1; + + /** */ + public static final int EXIT_CODE_CONNECTION_FAILED = 2; + + /** */ + public static final int ERR_AUTHENTICATION_FAILED = 3; + + /** */ + public static final int EXIT_CODE_UNEXPECTED_ERROR = 4; + + /** + * Output specified string to console. + * + * @param s String to output. + */ + private void log(String s) { + System.out.println(s); + } + /** - * @param args Args. + * Output empty line. */ - public static void main(String[] args) throws GridClientException { - String host = "127.0.0.1"; - String port = "11211"; - Boolean activate = null; + private void nl() { + System.out.println(""); + } + + /** + * Print error to console. + * + * @param errCode Error code to return. + * @param s Optional message. + * @param e Error to print. + */ + private int error(int errCode, String s, Throwable e) { + if (!F.isEmpty(s)) + log(s); - if (args.length == 1 && "--help".equals(args[0])){ - System.out.println("Example: --host {ip} --port {port} --{activate/deactivate} " + - "or without command --host {ip} --port {port} then will print status."); + String msg = e.getMessage(); - return; + if (F.isEmpty(msg)) + msg = e.getClass().getName(); + + if (msg.startsWith("Failed to handle request")) { + int p = msg.indexOf("err="); + + msg = msg.substring(p + 4, msg.length() - 1); } - if (args.length > 5) - throw new IllegalArgumentException("incorrect number of arguments"); + log("Error: " + msg); + + return errCode; + } + + /** + * Print command usage. + * + * @param desc Command description. + * @param cmd Command. + */ + private void usage(String desc, String cmd) { + log(desc); + log(" control.sh [--host HOST_OR_IP] [--port PORT] " + cmd); + nl(); + } + + /** + * Extract next argument. + * + * @param it Arguments iterator. + * @param err Error message. + * @return Next argument value. + */ + private String nextArg(Iterator it, String err) { + if (it.hasNext()) { + String arg = it.next(); + + if (arg.startsWith("--")) + throw new IllegalArgumentException("Unexpected argument: " + arg); + + return arg; + } + + throw new IllegalArgumentException(err); + } + + /** + * Activate cluster. + * + * @param client Client. + * @throws GridClientException If failed to activate. + */ + private void activate(GridClient client) throws Throwable { + try { + GridClientClusterState state = client.state(); + + state.active(true); - for (int i = 0; i < args.length; i++) { - String str = args[i]; + log("Cluster activated"); + } + catch (Throwable e) { + log("Failed to activate cluster."); - if ("--host".equals(str)) - host = args[i + 1]; - else if ("--port".equals(str)) - port = args[i + 1]; - else if ("--activate".equals(str)) - activate = true; - else if ("--deactivate".equals(str)) - activate = false; + throw e; } + } - if (host == null) - throw new IllegalArgumentException("host can not be empty"); + /** + * Deactivate cluster. + * + * @param client Client. + * @throws Throwable If failed to deactivate. + */ + private void deactivate(GridClient client) throws Throwable { + try { + GridClientClusterState state = client.state(); - if (port == null) - throw new IllegalArgumentException("port can not be empty"); + state.active(false); - GridClientConfiguration cfg = new GridClientConfiguration(); - cfg.setServers(Collections.singletonList(host + ":" + port)); + log("Cluster deactivated"); + } + catch (Throwable e) { + log("Failed to deactivate cluster."); - try (GridClient client = GridClientFactory.start(cfg)) { + throw e; + } + } + + /** + * Print cluster state. + * + * @param client Client. + * @throws Throwable If failed to print state. + */ + private void state(GridClient client) throws Throwable { + try { GridClientClusterState state = client.state(); - if (activate != null) + log("Cluster is " + (state.active() ? "active" : "inactive")); + } + catch (Throwable e) { + log("Failed to get cluster state."); + + throw e; + } + } + + /** + * + * @param client Client + * @return Task result. + * @throws GridClientException If failed to execute task. + */ + private R executeTask(GridClient client, Class taskCls, Object taskArgs) throws GridClientException { + GridClientCompute compute = client.compute(); + + List nodes = new ArrayList<>(); + + for (GridClientNode node : compute.nodes()) + if (node.connectable()) + nodes.add(node); + + if (F.isEmpty(nodes)) + throw new GridClientDisconnectedException("Connectable node not found", null); + + GridClientNode node = compute.balancer().balancedNode(nodes); + + return compute.projection(node).execute(taskCls.getName(), + new VisorTaskArgument<>(node.nodeId(), taskArgs, false)); + } + + /** + * Change baseline. + * + * @param client Client. + * @param baselineAct Baseline action to execute. @throws GridClientException If failed to execute baseline action. + * @param baselineArgs Baseline action arguments. + * @throws Throwable If failed to execute baseline action. + */ + private void baseline(GridClient client, String baselineAct, String baselineArgs) throws Throwable { + switch (baselineAct) { + case BASELINE_ADD: + baselineAdd(client, baselineArgs); + break; + + case BASELINE_REMOVE: + baselineRemove(client, baselineArgs); + break; + + case BASELINE_SET: + baselineSet(client, baselineArgs); + break; + + case BASELINE_SET_VERSION: + baselineVersion(client, baselineArgs); + break; + + default: + baselinePrint(client); + } + } + + /** + * Prepare task argument. + * + * @param op Operation. + * @param s Argument from command line. + * @return Task argument. + */ + private VisorBaselineTaskArg arg(VisorBaselineOperation op, String s) { + switch (op) { + case ADD: + case REMOVE: + case SET: + if(F.isEmpty(s)) + throw new IllegalArgumentException("Empty list of consistent IDs"); + + List consistentIds = new ArrayList<>(); + + for (String consistentId : s.split(",")) + consistentIds.add(consistentId.trim()); + + return new VisorBaselineTaskArg(op, -1, consistentIds); + + case VERSION: try { - state.active(activate); + long topVer = Long.parseLong(s); - System.out.println(host + ":" + port + " - was " + (activate ? "activate" : "deactivate")); + return new VisorBaselineTaskArg(op, topVer, null); } - catch (Exception e) { - System.out.println("Something fail during " + (activate ? "activation" : "deactivation") - + ", exception message: " + e.getMessage()); + catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid topology version: " + s, e); } - else - System.out.println(host + ":" + port + " - " + (state.active() ? "active" : "inactive")); + default: + return new VisorBaselineTaskArg(op, -1, null); } } + + /** + * Print baseline topology. + * + * @param res Task result with baseline topology. + */ + private void baselinePrint0(VisorBaselineTaskResult res) { + log("Cluster state: " + (res.isActive() ? "active" : "inactive")); + log("Current topology version: " + res.getTopologyVersion()); + nl(); + + Map baseline = res.getBaseline(); + Map servers = res.getServers(); + + if (F.isEmpty(baseline)) + log("Baseline nodes not found."); + else { + log("Baseline nodes:"); + + for(VisorBaselineNode node : baseline.values()) { + log(" ConsistentID=" + node.getConsistentId() + ", STATE=" + + (servers.containsKey(node.getConsistentId()) ? "ONLINE" : "OFFLINE")); + } + + log(DELIM); + log("Number of baseline nodes: " + baseline.size()); + + nl(); + + List others = new ArrayList<>(); + + for (VisorBaselineNode node : servers.values()) { + if (!baseline.containsKey(node.getConsistentId())) + others.add(node); + } + + if (F.isEmpty(others)) + log("Other nodes not found."); + else { + log("Other nodes:"); + + for(VisorBaselineNode node : others) + log(" ConsistentID=" + node.getConsistentId()); + + log("Number of other nodes: " + others.size()); + } + } + } + + /** + * Print current baseline. + * + * @param client Client. + */ + private void baselinePrint(GridClient client) throws GridClientException { + VisorBaselineTaskResult res = executeTask(client, VisorBaselineTask.class, arg(COLLECT, "")); + + baselinePrint0(res); + } + + /** + * Add nodes to baseline. + * + * @param client Client. + * @param baselineArgs Baseline action arguments. + * @throws Throwable If failed to add nodes to baseline. + */ + private void baselineAdd(GridClient client, String baselineArgs) throws Throwable { + try { + VisorBaselineTaskResult res = executeTask(client, VisorBaselineTask.class, arg(ADD, baselineArgs)); + + baselinePrint0(res); + } + catch (Throwable e) { + log("Failed to add nodes to baseline."); + + throw e; + } + } + + /** + * Remove nodes from baseline. + * + * @param client Client. + * @param consistentIds Consistent IDs. + * @throws Throwable If failed to remove nodes from baseline. + */ + private void baselineRemove(GridClient client, String consistentIds) throws Throwable { + try { + VisorBaselineTaskResult res = executeTask(client, VisorBaselineTask.class, arg(REMOVE, consistentIds)); + + baselinePrint0(res); + } + catch (Throwable e) { + log("Failed to remove nodes from baseline."); + + throw e; + } + } + + /** + * Set baseline. + * + * @param client Client. + * @param consistentIds Consistent IDs. + * @throws Throwable If failed to set baseline. + */ + private void baselineSet(GridClient client, String consistentIds) throws Throwable { + try { + VisorBaselineTaskResult res = executeTask(client, VisorBaselineTask.class, arg(SET, consistentIds)); + + baselinePrint0(res); + } + catch (Throwable e) { + log("Failed to set baseline."); + + throw e; + } + } + + /** + * Set baseline by topology version. + * + * @param client Client. + * @param arg Argument from command line. + */ + private void baselineVersion(GridClient client, String arg) throws GridClientException { + try { + VisorBaselineTaskResult res = executeTask(client, VisorBaselineTask.class, arg(VERSION, arg)); + + baselinePrint0(res); + } + catch (Throwable e) { + log("Failed to set baseline with specified topology version."); + + throw e; + } + } + + /** + * @param e Exception to check. + * @return {@code true} if specified exception is {@link GridClientAuthenticationException}. + */ + private boolean isAuthError(Throwable e) { + return X.hasCause(e, GridClientAuthenticationException.class); + } + + /** + * @param e Exception to check. + * @return {@code true} if specified exception is a connection error. + */ + private boolean isConnectionError(Throwable e) { + return e instanceof GridClientClosedException || + e instanceof GridClientConnectionResetException || + e instanceof GridClientDisconnectedException || + e instanceof GridClientHandshakeException || + e instanceof GridServerUnreachableException; + } + + /** + * Parse and execute command. + * + * @param args Arguments to parse and execute. + * @return Exit code. + */ + public int execute(String... args) { + log("Control utility [ver. " + ACK_VER_STR + "]"); + log(COPYRIGHT); + log("User: " + System.getProperty("user.name")); + log(DELIM); + + try { + if (F.isEmpty(args) || (args.length == 1 && CMD_HELP.equalsIgnoreCase(args[0]))){ + log("This utility can do the following commands:"); + + usage(" Activate cluster:", CMD_ACTIVATE); + usage(" Deactivate cluster:", CMD_DEACTIVATE); + usage(" Print current cluster state:", CMD_STATE); + usage(" Print cluster baseline topology:", CMD_BASE_LINE); + usage(" Add nodes into baseline topology:", CMD_BASE_LINE + " add consistentId1[,consistentId2,....,consistentIdN]"); + usage(" Remove nodes from baseline topology:", CMD_BASE_LINE + " remove consistentId1[,consistentId2,....,consistentIdN]"); + usage(" Set baseline topology:", CMD_BASE_LINE + " set consistentId1[,consistentId2,....,consistentIdN]"); + usage(" Set baseline topology based on version:", CMD_BASE_LINE + " version topologyVersion"); + + log("Default values:"); + log(" HOST_OR_IP=" + DFLT_HOST); + log(" PORT=" + DFLT_PORT); + nl(); + + log("Exit codes:"); + log(" " + EXIT_CODE_OK + " - successful execution."); + log(" " + EXIT_CODE_INVALID_ARGUMENTS + " - invalid arguments."); + log(" " + EXIT_CODE_CONNECTION_FAILED + " - connection failed."); + log(" " + ERR_AUTHENTICATION_FAILED + " - authentication failed."); + log(" " + EXIT_CODE_UNEXPECTED_ERROR + " - unexpected error."); + + return EXIT_CODE_OK; + } + + String host = DFLT_HOST; + + String port = DFLT_PORT; + + String baselineAct = ""; + + String baselineArgs = ""; + + List commands = new ArrayList<>(); + + Iterator it = Arrays.asList(args).iterator(); + + while (it.hasNext()) { + String str = it.next().toLowerCase(); + + switch (str) { + case CMD_HOST: + host = nextArg(it, "Expected host name"); + break; + + case CMD_PORT: + port = nextArg(it, "Expected port number"); + + try { + int p = Integer.parseInt(port); + + if (p <= 0 || p > 65535) + throw new IllegalArgumentException("Invalid value for port: " + port); + } + catch (NumberFormatException ignored) { + throw new IllegalArgumentException("Invalid value for port: " + port); + } + break; + + case CMD_ACTIVATE: + case CMD_DEACTIVATE: + case CMD_STATE: + commands.add(str); + break; + + case CMD_BASE_LINE: + commands.add(CMD_BASE_LINE); + + if (it.hasNext()) { + baselineAct = it.next().toLowerCase(); + + if (BASELINE_ADD.equals(baselineAct) || BASELINE_REMOVE.equals(baselineAct) || + BASELINE_SET.equals(baselineAct) || BASELINE_SET_VERSION.equals(baselineAct)) + baselineArgs = nextArg(it, "Expected baseline arguments"); + else + throw new IllegalArgumentException("Unexpected argument for " + CMD_BASE_LINE + ": " + + baselineAct); + } + + } + } + + int sz = commands.size(); + + if (sz < 1) + throw new IllegalArgumentException("No action was specified"); + + if (sz > 1) + throw new IllegalArgumentException("Only one action can be specified, but found: " + sz); + + GridClientConfiguration cfg = new GridClientConfiguration(); + + cfg.setServers(Collections.singletonList(host + ":" + port)); + + try (GridClient client = GridClientFactory.start(cfg)) { + String cmd = commands.get(0); + + switch (cmd) { + case CMD_ACTIVATE: + activate(client); + break; + + case CMD_DEACTIVATE: + deactivate(client); + break; + + case CMD_STATE: + state(client); + break; + + case CMD_BASE_LINE: + baseline(client, baselineAct, baselineArgs); + break; + } + } + + return 0; + } + catch (IllegalArgumentException e) { + return error(EXIT_CODE_INVALID_ARGUMENTS, "Check arguments.", e); + } + catch (Throwable e) { + if (isAuthError(e)) + return error(ERR_AUTHENTICATION_FAILED, "Authentication error.", e); + + if (isConnectionError(e)) + return error(EXIT_CODE_CONNECTION_FAILED, "Connection to cluster failed.", e); + + + return error(EXIT_CODE_UNEXPECTED_ERROR, "", e); + } + } + + /** + * @param args Arguments to parse and apply. + */ + public static void main(String[] args) { + CommandHandler hnd = new CommandHandler(); + + System.exit(hnd.execute(args)); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java index a151eb5..74f5a10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java @@ -558,6 +558,20 @@ public abstract class GridManagerAdapter implements GridMan return null; } + @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, DiscoveryDataBag discoData) { + for (GridComponent comp : ctx) { + if (comp.discoveryDataType() == null) + continue; + + IgniteNodeValidationResult err = comp.validateNode(node, discoData.newJoinerDiscoveryData(comp.discoveryDataType().ordinal())); + + if (err != null) + return err; + } + + return null; + } + @Override public Collection authenticatedSubjects() { try { return ctx.security().authenticatedSubjects(); @@ -705,6 +719,11 @@ public abstract class GridManagerAdapter implements GridMan } /** {@inheritDoc} */ + @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode node, DiscoveryDataBag.JoiningNodeDiscoveryData discoData) { + return null; + } + + /** {@inheritDoc} */ @Override public final String toString() { return S.toString(GridManagerAdapter.class, this, "name", getClass().getName()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java index c524331..59f773d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/ConsistentIdMapper.java @@ -19,10 +19,11 @@ package org.apache.ignite.internal.managers.discovery; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.UUID; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cluster.BaselineTopology; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -31,71 +32,104 @@ import org.jetbrains.annotations.Nullable; */ public class ConsistentIdMapper { /** Discovery manager. */ - private final GridDiscoveryManager discoveryManager; + private final GridDiscoveryManager discoveryMgr; /** * Create an instance of mapper. * - * @param discoveryManager Discovery manager. + * @param discoveryMgr Discovery manager. */ - public ConsistentIdMapper(GridDiscoveryManager discoveryManager) { - this.discoveryManager = discoveryManager; + public ConsistentIdMapper(GridDiscoveryManager discoveryMgr) { + this.discoveryMgr = discoveryMgr; } /** - * Map UUID to consistent id. + * Maps UUID to compact ID for given baseline topology. * * @param topVer Topology version. * @param nodeId UUID of node. - * @return Consistent id of node. + * @return Compact ID of node for given baseline topology. */ - public Object mapToConsistentId(AffinityTopologyVersion topVer, UUID nodeId) { - ClusterNode node = discoveryManager.node(topVer, nodeId); + public short mapToCompactId(AffinityTopologyVersion topVer, UUID nodeId) { + Map m = discoveryMgr.consistentId(topVer); - if (node == null) - throw new IllegalStateException("Unable to find node by UUID [nodeId=" + nodeId + ", topVer=" + topVer + ']'); + if (m == null) + throw new IllegalStateException("Unable to find consistent id map [topVer" + topVer + ']'); - return node.consistentId(); + Short constId = m.get(nodeId); + + if (constId == null) + throw new IllegalStateException("Unable to find consistentId by UUID [nodeId=" + nodeId + ", topVer=" + topVer + ']'); + + return constId; } /** - * Map consistent id to UUID. + * Maps UUID to compact ID for given baseline topology. * - * @param consistentId Consistent id of node. - * @return UUID of node. + * @param topVer Topology version. + * @param nodeConstId UUID of node. + * @return Compact ID of node for given baseline topology. */ - @Nullable public UUID mapToUUID(Object consistentId) { - for (ClusterNode node : discoveryManager.allNodes()) - if (node.consistentId().equals(consistentId)) - return node.id(); + public UUID mapToUuid(AffinityTopologyVersion topVer, short nodeConstId) { + Map map = discoveryMgr.nodeIdMap(topVer); + + if (map == null) + return null; + + UUID constId = map.get(nodeConstId); - return null; + if (constId == null) + throw new IllegalStateException("Unable to find UUID by constId [nodeId=" + nodeConstId + ", topVer=" + topVer + ']'); + + return constId; } /** - * Map primary -> backup node UUIDs to consistent ids. + * Map primary -> backup node compact ID accordingly to baseline topology.. * * @param txNodes Primary -> backup UUID nodes. - * @return Primary -> backup consistent id nodes. + * @return Primary -> backup compact ID nodes. */ - public Map> mapToConsistentIds(AffinityTopologyVersion topVer, @Nullable Map> txNodes) { + public Map> mapToCompactIds( + AffinityTopologyVersion topVer, + @Nullable Map> txNodes, + BaselineTopology baselineTop + ) { if (txNodes == null) return null; - Map> consistentMap = U.newHashMap(txNodes.keySet().size()); + Map constIdMap = baselineTop.consistentIdMapping(); + + Map m = discoveryMgr.consistentId(topVer); - for (UUID node : txNodes.keySet()) { - Collection backupNodes = txNodes.get(node); + int bltNodes = m.size(); - Collection consistentIdsBackups = new ArrayList<>(backupNodes.size()); + Map> consistentMap = U.newHashMap(txNodes.size()); - for (UUID backup : backupNodes) - consistentIdsBackups.add(mapToConsistentId(topVer, backup)); + int nodeCnt = 0; - consistentMap.put(mapToConsistentId(topVer, node), consistentIdsBackups); + for (Map.Entry> e : txNodes.entrySet()) { + UUID node = e.getKey(); + + Collection backupNodes = e.getValue(); + + Collection backups = new ArrayList<>(backupNodes.size()); + + for (UUID backup : backupNodes) { + if (m.containsKey(backup)) + nodeCnt++; + + backups.add(mapToCompactId(topVer, backup)); + } + + // Optimization for short store full nodes set. + if (backups.size() == nodeCnt && nodeCnt == (bltNodes - 1)) + backups = Collections.singletonList(Short.MAX_VALUE); + + consistentMap.put(mapToCompactId(topVer, node), backups); } return consistentMap; } - } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 9ed70aa..aa47168 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -23,11 +23,13 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.ignite.cluster.BaselineNode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -40,6 +42,13 @@ import org.jetbrains.annotations.Nullable; */ public class DiscoCache { /** */ + private static final C1 BASELINE_TO_CLUSTER = new C1() { + @Override public ClusterNode apply(BaselineNode baselineNode) { + return (ClusterNode)baselineNode; + } + }; + + /** */ private final DiscoveryDataClusterState state; /** Local node. */ @@ -57,6 +66,9 @@ public class DiscoCache { /** Daemon nodes. */ private final List daemonNodes; + /** Baseline nodes. */ + private final List baselineNodes; + /** All remote nodes with at least one cache configured. */ @GridToStringInclude private final List rmtNodesWithCaches; @@ -81,6 +93,18 @@ public class DiscoCache { /** */ private final AffinityTopologyVersion topVer; + /** */ + final Map nodeIdToConsIdx; + + /** */ + final Map consIdxToNodeId; + + /** */ + private final P1 aliveBaselineNodePred; + + /** */ + private final P1 aliveNodePred; + /** * @param topVer Topology version. * @param state Current cluster state. @@ -90,10 +114,13 @@ public class DiscoCache { * @param srvNodes Server nodes. * @param daemonNodes Daemon nodes. * @param rmtNodesWithCaches Remote nodes with at least one cache configured. + * @param baselineNodes Baseline nodes or {@code null} if baseline was not set. * @param allCacheNodes Cache nodes by cache name. * @param cacheGrpAffNodes Affinity nodes by cache group ID. * @param nodeMap Node map. - * @param alives Alive nodes. + * @param alives0 Alive nodes. + * @param nodeIdToConsIdx Node ID to consistent ID mapping. + * @param consIdxToNodeId Consistent ID to node ID mapping. * @param minNodeVer Minimum node version. */ DiscoCache( @@ -105,11 +132,15 @@ public class DiscoCache { List srvNodes, List daemonNodes, List rmtNodesWithCaches, + @Nullable List baselineNodes, Map> allCacheNodes, Map> cacheGrpAffNodes, Map nodeMap, - Set alives, - IgniteProductVersion minNodeVer) { + Set alives0, + @Nullable Map nodeIdToConsIdx, + @Nullable Map consIdxToNodeId, + IgniteProductVersion minNodeVer + ) { this.topVer = topVer; this.state = state; this.loc = loc; @@ -118,11 +149,28 @@ public class DiscoCache { this.srvNodes = srvNodes; this.daemonNodes = daemonNodes; this.rmtNodesWithCaches = rmtNodesWithCaches; + this.baselineNodes = baselineNodes; this.allCacheNodes = allCacheNodes; this.cacheGrpAffNodes = cacheGrpAffNodes; this.nodeMap = nodeMap; - this.alives.addAll(alives); + alives.addAll(alives0); this.minNodeVer = minNodeVer; + this.nodeIdToConsIdx = nodeIdToConsIdx; + this.consIdxToNodeId = consIdxToNodeId; + + aliveBaselineNodePred = new P1() { + @Override + public boolean apply(BaselineNode node) { + return node instanceof ClusterNode && alives.contains(((ClusterNode)node).id()); + + } + }; + + aliveNodePred = new P1() { + @Override public boolean apply(ClusterNode node) { + return alives.contains(node.id()); + } + }; } /** @@ -156,6 +204,15 @@ public class DiscoCache { return rmtNodes; } + /** + * Returns a collection of baseline nodes. + * + * @return A collection of baseline nodes or {@code null} if baseline topology was not set. + */ + @Nullable public List baselineNodes() { + return baselineNodes; + } + /** @return All nodes. */ public List allNodes() { return allNodes; @@ -171,17 +228,23 @@ public class DiscoCache { return daemonNodes; } + /** @return Consistent id map UUID -> Short (compacted consistent id). */ + public Map consistentIdMap() { + return nodeIdToConsIdx; + } + + /** @return Consistent id map Short (compacted consistent id) -> UUID. */ + public Map nodeIdMap() { + return consIdxToNodeId; + } + /** * Gets all alive remote nodes that have at least one cache configured. * * @return Collection of nodes. */ public Collection remoteAliveNodesWithCaches() { - return F.view(rmtNodesWithCaches, new P1() { - @Override public boolean apply(ClusterNode node) { - return alives.contains(node.id()); - } - }); + return F.view(rmtNodesWithCaches, aliveNodePred); } /** @@ -190,17 +253,26 @@ public class DiscoCache { * @return Collection of nodes. */ public Collection aliveServerNodes() { - return F.view(serverNodes(), new P1() { - @Override public boolean apply(ClusterNode node) { - return alives.contains(node.id()); - } - }); + return F.view(serverNodes(), aliveNodePred); + } + + /** + * Returns a collection of live baseline nodes. + * + * @return A view of baseline nodes that are currently present in the cluster or {@code null} if baseline + * topology was not set. + */ + @Nullable public Collection aliveBaselineNodes() { + return baselineNodes == null ? null : F.viewReadOnly(baselineNodes, BASELINE_TO_CLUSTER, aliveBaselineNodePred); + } /** * @return Oldest alive server node. */ - public @Nullable ClusterNode oldestAliveServerNode(){ + @SuppressWarnings("ForLoopReplaceableByForEach") + @Nullable public ClusterNode oldestAliveServerNode(){ + // Avoid iterator allocation. for (int i = 0; i < srvNodes.size(); i++) { ClusterNode srv = srvNodes.get(i); @@ -330,10 +402,13 @@ public class DiscoCache { srvNodes, daemonNodes, rmtNodesWithCaches, + baselineNodes, allCacheNodes, cacheGrpAffNodes, nodeMap, alives, + nodeIdToConsIdx, + consIdxToNodeId, minNodeVer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 088c9ce..65cc666 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -50,6 +50,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cluster.BaselineNode; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -59,10 +60,10 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.GridNodeOrderComparator; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.cluster.NodeOrderComparator; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.GridManagerAdapter; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; @@ -74,12 +75,12 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cluster.BaselineTopology; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; -import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; +import org.apache.ignite.internal.processors.cluster.IGridClusterStateProcessor; import org.apache.ignite.internal.processors.security.SecurityContext; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridAtomicLong; @@ -705,11 +706,19 @@ public class GridDiscoveryManager extends GridManagerAdapter { Snapshot snapshot = topSnap.get(); if (customMsg == null) { - discoCache = createDiscoCache(nextTopVer, + discoCache = createDiscoCache( + nextTopVer, ctx.state().clusterState(), locNode, topSnapshot); } + else if (customMsg instanceof ChangeGlobalStateMessage) { + discoCache = createDiscoCache( + nextTopVer, + ctx.state().pendingState((ChangeGlobalStateMessage)customMsg), + locNode, + topSnapshot); + } else discoCache = customMsg.createDiscoCache(GridDiscoveryManager.this, nextTopVer, snapshot.discoCache); @@ -784,7 +793,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode, - Collections.singleton(locNode)) + Collections.singleton(locNode)) )); } else if (type == EVT_CLIENT_NODE_RECONNECTED) { @@ -846,7 +855,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { if (ctx.localNodeId().equals(dataBag.joiningNodeId())) { // NodeAdded msg reached joining node after round-trip over the ring. - GridClusterStateProcessor stateProc = ctx.state(); + IGridClusterStateProcessor stateProc = ctx.state(); stateProc.onGridDataReceived(dataBag.gridDiscoveryData( stateProc.discoveryDataType().ordinal())); @@ -858,7 +867,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { } else { // Discovery data from newly joined node has to be applied to the current old node. - GridClusterStateProcessor stateProc = ctx.state(); + IGridClusterStateProcessor stateProc = ctx.state(); JoiningNodeDiscoveryData data0 = dataBag.newJoinerDiscoveryData( stateProc.discoveryDataType().ordinal()); @@ -1810,6 +1819,15 @@ public class GridDiscoveryManager extends GridManagerAdapter { } /** + * @param topVer Topology version. + * @return All baseline nodes for given topology version or {@code null} if baseline was not set for the + * given topology version. + */ + @Nullable public List baselineNodes(AffinityTopologyVersion topVer) { + return resolveDiscoCache(CU.cacheId(null), topVer).baselineNodes(); + } + + /** * Gets node from history for given topology version. * * @param topVer Topology version. @@ -1821,6 +1839,26 @@ public class GridDiscoveryManager extends GridManagerAdapter { } /** + * Gets consistentId from history for given topology version. + * + * @param topVer Topology version. + * @return Compacted consistent id. + */ + public Map consistentId(AffinityTopologyVersion topVer) { + return resolveDiscoCache(CU.cacheId(null), topVer).consistentIdMap(); + } + + /** + * Gets consistentId from history for given topology version. + * + * @param topVer Topology version. + * @return Compacted consistent id map. + */ + public Map nodeIdMap(AffinityTopologyVersion topVer) { + return resolveDiscoCache(CU.cacheId(null), topVer).nodeIdMap(); + } + + /** * Gets cache nodes for cache with given name. * * @param cacheName Cache name. @@ -2004,7 +2042,7 @@ public class GridDiscoveryManager extends GridManagerAdapter { /** * @return Consistent ID. - * @deprecated Use PdsConsistentIdProcessor to get actual consistent ID + * @deprecated Use {@link ClusterNode#consistentId()} of local node to get actual consistent ID. */ @Deprecated public Serializable consistentId() { @@ -2206,6 +2244,10 @@ public class GridDiscoveryManager extends GridManagerAdapter { ArrayList rmtNodes = new ArrayList<>(topSnapshot.size()); ArrayList allNodes = new ArrayList<>(topSnapshot.size()); + Map nodeIdToConsIdx; + Map consIdxToNodeId; + List baselineNodes; + IgniteProductVersion minVer = null; for (ClusterNode node : topSnapshot) { @@ -2237,10 +2279,52 @@ public class GridDiscoveryManager extends GridManagerAdapter { Map> allCacheNodes = U.newHashMap(allNodes.size()); Map> cacheGrpAffNodes = U.newHashMap(allNodes.size()); - Set rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + Set rmtNodesWithCaches = new TreeSet<>(NodeOrderComparator.getInstance()); fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches); + BaselineTopology blt = state.baselineTopology(); + + if (blt != null) { + nodeIdToConsIdx = U.newHashMap(srvNodes.size()); + consIdxToNodeId = U.newHashMap(srvNodes.size()); + + Map m = blt.consistentIdMapping(); + + Map aliveNodesByConsId = U.newHashMap(srvNodes.size()); + + for (ClusterNode node : srvNodes) { + Short compactedId = m.get(node.consistentId()); + + if (compactedId != null) { + nodeIdToConsIdx.put(node.id(), compactedId); + + consIdxToNodeId.put(compactedId, node.id()); + } + + aliveNodesByConsId.put(node.consistentId(), node); + } + + ListbaselineNodes0 = new ArrayList<>(blt.size()); + + for (Object consId : blt.consistentIds()) { + ClusterNode srvNode = aliveNodesByConsId.get(consId); + + if (srvNode != null) + baselineNodes0.add(srvNode); + else + baselineNodes0.add(blt.baselineNode(consId)); + } + + baselineNodes = baselineNodes0; + } + else { + nodeIdToConsIdx = null; + consIdxToNodeId = null; + + baselineNodes = null; + } + return new DiscoCache( topVer, state, @@ -2250,10 +2334,13 @@ public class GridDiscoveryManager extends GridManagerAdapter { Collections.unmodifiableList(srvNodes), Collections.unmodifiableList(daemonNodes), U.sealList(rmtNodesWithCaches), + baselineNodes == null ? null : Collections.unmodifiableList(baselineNodes), Collections.unmodifiableMap(allCacheNodes), Collections.unmodifiableMap(cacheGrpAffNodes), Collections.unmodifiableMap(nodeMap), alives, + nodeIdToConsIdx == null ? null : Collections.unmodifiableMap(nodeIdToConsIdx), + consIdxToNodeId == null ? null : Collections.unmodifiableMap(consIdxToNodeId), minVer); } @@ -2364,8 +2451,12 @@ public class GridDiscoveryManager extends GridManagerAdapter { discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, node, - createDiscoCache(AffinityTopologyVersion.NONE, null, node, locNodeOnlyTop), - locNodeOnlyTop, + createDiscoCache( + AffinityTopologyVersion.NONE, + ctx.state().clusterState(), + node, + locNodeOnlyTop + ), locNodeOnlyTop, null); lastSegChkRes.set(false); @@ -3056,12 +3147,14 @@ public class GridDiscoveryManager extends GridManagerAdapter { * @param discoCache Current disco cache. * @return New discovery cache. */ - public DiscoCache createDiscoCacheOnCacheChange(AffinityTopologyVersion topVer, - DiscoCache discoCache) { + public DiscoCache createDiscoCacheOnCacheChange( + AffinityTopologyVersion topVer, + DiscoCache discoCache + ) { List allNodes = discoCache.allNodes(); Map> allCacheNodes = U.newHashMap(allNodes.size()); Map> cacheGrpAffNodes = U.newHashMap(allNodes.size()); - Set rmtNodesWithCaches = new TreeSet<>(GridNodeOrderComparator.INSTANCE); + Set rmtNodesWithCaches = new TreeSet<>(NodeOrderComparator.getInstance()); fillAffinityNodeCaches(allNodes, allCacheNodes, cacheGrpAffNodes, rmtNodesWithCaches); @@ -3074,10 +3167,13 @@ public class GridDiscoveryManager extends GridManagerAdapter { discoCache.serverNodes(), discoCache.daemonNodes(), U.sealList(rmtNodesWithCaches), + discoCache.baselineNodes(), allCacheNodes, cacheGrpAffNodes, discoCache.nodeMap, discoCache.alives, + discoCache.nodeIdToConsIdx, + discoCache.consIdxToNodeId, discoCache.minimumNodeVersion()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/mem/unsafe/UnsafeMemoryProvider.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/mem/unsafe/UnsafeMemoryProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/mem/unsafe/UnsafeMemoryProvider.java index bf6e807..276e10e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/mem/unsafe/UnsafeMemoryProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/mem/unsafe/UnsafeMemoryProvider.java @@ -57,13 +57,15 @@ public class UnsafeMemoryProvider implements DirectMemoryProvider { /** {@inheritDoc} */ @Override public void shutdown() { - for (Iterator it = regions.iterator(); it.hasNext(); ) { - DirectMemoryRegion chunk = it.next(); + if (regions != null) { + for (Iterator it = regions.iterator(); it.hasNext(); ) { + DirectMemoryRegion chunk = it.next(); - GridUnsafe.freeMemory(chunk.address()); + GridUnsafe.freeMemory(chunk.address()); - // Safety. - it.remove(); + // Safety. + it.remove(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java index 64c5927..6802a3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/IgnitePageStoreManager.java @@ -52,6 +52,11 @@ public interface IgnitePageStoreManager extends GridCacheSharedManager, IgniteCh throws IgniteCheckedException; /** + * Initializes disk cache store structures. + */ + public void initializeForMetastorage() throws IgniteCheckedException; + + /** * Callback called when a cache is stopping. After this callback is invoked, no data associated with * the given cache will be stored on disk. * http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index 42d9611..19b47e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -96,10 +96,11 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni * the given pointer will be kept because there is a configurable WAL history size. Those entries may be used * for partial partition rebalancing. * - * @param ptr Pointer for which it is safe to clear the log. + * @param low Pointer since which WAL will be truncated. If null, WAL will be truncated from the oldest segment. + * @param high Pointer for which it is safe to clear the log. * @return Number of deleted WAL segments. */ - public int truncate(WALPointer ptr); + public int truncate(WALPointer low, WALPointer high); /** * Gives a hint to WAL manager to compact WAL until given pointer (exclusively). http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java new file mode 100644 index 0000000..48b60b3 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/BaselineTopologyRecord.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.wal.record; + +import java.util.Map; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Record for storing baseline topology compact node ID to consistent node ID mapping. + */ +public class BaselineTopologyRecord extends WALRecord { + /** Id. */ + private int id; + + /** Compact ID to consistent ID mapping. */ + private Map mapping; + + /** + * Default constructor. + */ + private BaselineTopologyRecord() { + // No-op, used from factory methods. + } + + /** + * @param id Baseline topology ID. + * @param mapping Compact ID to consistent ID mapping. + */ + public BaselineTopologyRecord(int id, Map mapping) { + this.id = id; + this.mapping = mapping; + } + + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.BASELINE_TOP_RECORD; + } + + /** + * Returns baseline topology ID. + * + * @return Baseline topology ID. + */ + public int id() { + return id; + } + + /** + * Returns mapping. + * + * @return Compact ID to consistent ID mapping. + */ + public Map mapping() { + return mapping; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(BaselineTopologyRecord.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CacheState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CacheState.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CacheState.java index 41d38d0..49025c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CacheState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CacheState.java @@ -30,6 +30,9 @@ public class CacheState { private long[] vals; /** */ + private byte[] states; + + /** */ private int idx; /** @@ -38,6 +41,7 @@ public class CacheState { public CacheState(int partsCnt) { parts = new int[partsCnt]; vals = new long[partsCnt * 2]; + states = new byte[partsCnt]; } /** @@ -46,6 +50,16 @@ public class CacheState { * @param cntr Partition counter. */ public void addPartitionState(int partId, long size, long cntr) { + addPartitionState(partId, size, cntr, (byte)-1); + } + + /** + * @param partId Partition ID to add. + * @param size Partition size. + * @param cntr Partition counter. + * @param state Partition state. + */ + public void addPartitionState(int partId, long size, long cntr, byte state) { if (idx == parts.length) throw new IllegalStateException("Failed to add new partition to the partitions state " + "(no enough space reserved) [partId=" + partId + ", reserved=" + parts.length + ']'); @@ -57,6 +71,8 @@ public class CacheState { } parts[idx] = partId; + states[idx] = state; + vals[2 * idx] = size; vals[2 * idx + 1] = cntr; @@ -97,6 +113,14 @@ public class CacheState { /** * @param idx Index to get. + * @return State partition. + */ + public byte stateByIndex(int idx) { + return states[idx]; + } + + /** + * @param idx Index to get. * @return Partition size by index. */ public long partitionSizeByIndex(int idx) { http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java index cb6b482..3511aff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataEntry.java @@ -97,8 +97,8 @@ public class DataEntry { this.partId = partId; this.partCnt = partCnt; - // Only CREATE, UPDATE and DELETE operations should be stored in WAL. - assert op == GridCacheOperation.CREATE || op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE : op; + // Only READ, CREATE, UPDATE and DELETE operations should be stored in WAL. + assert op == GridCacheOperation.READ || op == GridCacheOperation.CREATE || op == GridCacheOperation.UPDATE || op == GridCacheOperation.DELETE : op; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java index ac569bd..7a4d6b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/DataRecord.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Logical data record with cache operation description. @@ -48,19 +49,18 @@ public class DataRecord extends TimeStampRecord { * @param writeEntry Write entry. */ public DataRecord(DataEntry writeEntry) { - this(Collections.singletonList(writeEntry)); + this(writeEntry, U.currentTimeMillis()); } /** * @param writeEntries Write entries. */ public DataRecord(List writeEntries) { - this.writeEntries = writeEntries; + this(writeEntries, U.currentTimeMillis()); } /** * @param writeEntry Write entry. - * @param timestamp TimeStamp. */ public DataRecord(DataEntry writeEntry, long timestamp) { this(Collections.singletonList(writeEntry), timestamp); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ExchangeRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ExchangeRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ExchangeRecord.java new file mode 100644 index 0000000..87bc4e6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/ExchangeRecord.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.wal.record; + +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Partition map exchange WAL record. + */ +public class ExchangeRecord extends TimeStampRecord { + /** Event. */ + private Short constId; + + /** Type. */ + private Type type; + + /** + * @param constId Const id. + * @param type Type. + * @param timeStamp TimeStamp. + */ + public ExchangeRecord(Short constId, Type type, long timeStamp) { + super(timeStamp); + + this.constId = constId; + this.type = type; + } + + /** + * @param constId Const id. + * @param type Type. + */ + public ExchangeRecord(Short constId, Type type) { + this.constId = constId; + this.type = type; + } + + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.EXCHANGE; + } + + /** + * + */ + public Short getConstId() { + return constId; + } + + /** + * + */ + public Type getType() { + return type; + } + + /** + * + */ + public enum Type { + /** Join. */ + JOIN, + /** Left. */ + LEFT + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ExchangeRecord.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java index 519e825..e077e5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/FilteredRecord.java @@ -24,6 +24,9 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRe * {@link AbstractWalRecordsIterator}. */ public class FilteredRecord extends WALRecord { + /** Instance. */ + public static final FilteredRecord INSTANCE = new FilteredRecord(); + /** {@inheritDoc} */ @Override public RecordType type() { return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java new file mode 100644 index 0000000..e269de2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/MetastoreDataRecord.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.ignite.internal.pagemem.wal.record; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class MetastoreDataRecord extends WALRecord { + /** */ + private final String key; + + /** */ + @Nullable private final byte[] value; + + /** + * @param key Key. + * @param value Value. + */ + public MetastoreDataRecord(String key, @Nullable byte[] value) { + this.key = key; + this.value = value; + } + + /** */ + public String key() { + return key; + } + + /** */ + @Nullable public byte[] value() { + return value; + } + + /** {@inheritDoc} */ + @Override public RecordType type() { + return RecordType.METASTORE_DATA_RECORD; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MetastoreDataRecord.class, this, "super", super.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java index 6ee96a4..6740221 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import sun.nio.ch.DirectBuffer; /** * @@ -92,9 +93,12 @@ public class PageSnapshot extends WALRecord { + "],\nsuper = [" + super.toString() + "]]"; } - catch (IgniteCheckedException e) { + catch (IgniteCheckedException ignored) { return "Error during call'toString' of PageSnapshot [fullPageId=" + fullPageId() + ", pageData = " + Arrays.toString(pageData) + ", super=" + super.toString() + "]"; } + finally { + ((DirectBuffer)buf).cleaner().clean(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java index 3c3a77b..c6b6329 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/SnapshotRecord.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.pagemem.wal.record; +import org.apache.ignite.internal.util.typedef.internal.S; + /** * Wal snapshot record. */ @@ -49,10 +51,20 @@ public class SnapshotRecord extends WALRecord { return full; } + /** {@inheritDoc} */ + @Override public boolean rollOver() { + return true; + } + /** * */ @Override public RecordType type() { return RecordType.SNAPSHOT; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(SnapshotRecord.class, this, "super", super.toString()); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java index 3f29dfd..c1b8584 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TimeStampRecord.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.pagemem.wal.record; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; /** @@ -25,6 +27,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; */ public abstract class TimeStampRecord extends WALRecord { /** Timestamp. */ + @GridToStringInclude protected long timestamp; /** @@ -54,4 +57,9 @@ public abstract class TimeStampRecord extends WALRecord { public long timestamp() { return timestamp; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TimeStampRecord.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java index f933fa9..90df004 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/TxRecord.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.pagemem.wal.record; import java.util.Collection; import java.util.Map; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; @@ -30,9 +31,11 @@ import org.jetbrains.annotations.Nullable; */ public class TxRecord extends TimeStampRecord { /** Transaction state. */ + @GridToStringInclude private TransactionState state; /** Global transaction identifier within cluster, assigned by transaction coordinator. */ + @GridToStringInclude private GridCacheVersion nearXidVer; /** Transaction entries write topology version. */ @@ -42,33 +45,27 @@ public class TxRecord extends TimeStampRecord { * Transaction participating nodes. * * Structure: - * Primary node -> [Backup nodes...] + * Primary node -> [Backup nodes...], where nodes are identified by compact ID for some baseline topology. **/ - @Nullable private Map> participatingNodes; - - /** If transaction is remote, primary node for this backup node. */ - @Nullable private Object primaryNode; + @Nullable private Map> participatingNodes; /** * * @param state Transaction state. * @param nearXidVer Transaction id. * @param writeVer Transaction entries write topology version. - * @param participatingNodes Primary -> Backup nodes participating in transaction. - * @param primaryNode Primary node. + * @param participatingNodes Primary -> Backup nodes compact IDs participating in transaction. */ public TxRecord( TransactionState state, GridCacheVersion nearXidVer, GridCacheVersion writeVer, - @Nullable Map> participatingNodes, - @Nullable Object primaryNode + @Nullable Map> participatingNodes ) { this.state = state; this.nearXidVer = nearXidVer; this.writeVer = writeVer; this.participatingNodes = participatingNodes; - this.primaryNode = primaryNode; } /** @@ -76,24 +73,21 @@ public class TxRecord extends TimeStampRecord { * @param nearXidVer Transaction id. * @param writeVer Transaction entries write topology version. * @param participatingNodes Primary -> Backup nodes participating in transaction. - * @param primaryNode Primary node. - * @param timestamp TimeStamp. + * @param ts TimeStamp. */ public TxRecord( TransactionState state, GridCacheVersion nearXidVer, GridCacheVersion writeVer, - @Nullable Map> participatingNodes, - @Nullable Object primaryNode, - long timestamp + @Nullable Map> participatingNodes, + long ts ) { - super(timestamp); + super(ts); this.state = state; this.nearXidVer = nearXidVer; this.writeVer = writeVer; this.participatingNodes = participatingNodes; - this.primaryNode = primaryNode; } /** {@inheritDoc} */ @@ -144,33 +138,19 @@ public class TxRecord extends TimeStampRecord { } /** - * @return Primary -> backup participating nodes. + * @return Primary -> backup participating nodes compact IDs. */ - public Map> participatingNodes() { + public Map> participatingNodes() { return participatingNodes; } /** - * @param participatingNodeIds Primary -> backup participating nodes. + * @param participatingNodeIds Primary -> backup participating nodes compact IDs. */ - public void participatingNodes(Map> participatingNodeIds) { + public void participatingNodes(Map> participatingNodeIds) { this.participatingNodes = participatingNodeIds; } - /** - * @return Is transaction remote for backup. - */ - public boolean remote() { - return primaryNode != null; - } - - /** - * @return Primary node for backup if transaction is remote. - */ - @Nullable public Object primaryNode() { - return primaryNode; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(TxRecord.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 7446916..8362a69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -166,7 +166,16 @@ public abstract class WALRecord { PARTITION_DESTROY, /** Snapshot record. */ - SNAPSHOT; + SNAPSHOT, + + /** Metastore data record. */ + METASTORE_DATA_RECORD, + + /** Exchange record. */ + EXCHANGE, + + /** Baseline topology record. */ + BASELINE_TOP_RECORD; /** */ private static final RecordType[] VALS = RecordType.values(); @@ -241,6 +250,13 @@ public abstract class WALRecord { } /** + * @return Need wal rollOver. + */ + public boolean rollOver(){ + return false; + } + + /** * @return Entry type. */ public abstract RecordType type(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java index e07c388..5324d56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertFragmentRecord.java @@ -19,7 +19,8 @@ package org.apache.ignite.internal.pagemem.wal.record.delta; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -54,7 +55,7 @@ public class DataPageInsertFragmentRecord extends PageDeltaRecord { /** {@inheritDoc} */ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { - DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); + AbstractDataPageIO io = PageIO.getPageIO(pageAddr); io.addRowFragment(pageAddr, payload, lastLink, pageMem.pageSize()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java index f315058..2c9a8e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageInsertRecord.java @@ -19,7 +19,8 @@ package org.apache.ignite.internal.pagemem.wal.record.delta; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.typedef.internal.S; /** @@ -55,7 +56,7 @@ public class DataPageInsertRecord extends PageDeltaRecord { @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { assert payload != null; - DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); + AbstractDataPageIO io = PageIO.getPageIO(pageAddr); io.addRow(pageAddr, payload, pageMem.pageSize()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageRemoveRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageRemoveRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageRemoveRecord.java index 484ec87..f7776be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageRemoveRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageRemoveRecord.java @@ -19,7 +19,8 @@ package org.apache.ignite.internal.pagemem.wal.record.delta; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.typedef.internal.S; /** @@ -50,7 +51,7 @@ public class DataPageRemoveRecord extends PageDeltaRecord { /** {@inheritDoc} */ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { - DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); + AbstractDataPageIO io = PageIO.getPageIO(pageAddr); io.removeRow(pageAddr, itemId, pageMem.pageSize()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f7aba85/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageSetFreeListPageRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageSetFreeListPageRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageSetFreeListPageRecord.java index 0ade484..e679611 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageSetFreeListPageRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageSetFreeListPageRecord.java @@ -19,7 +19,8 @@ package org.apache.ignite.internal.pagemem.wal.record.delta; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.typedef.internal.S; import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_PAGE_SET_FREE_LIST_PAGE; @@ -51,7 +52,7 @@ public class DataPageSetFreeListPageRecord extends PageDeltaRecord { /** {@inheritDoc} */ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { - DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr); + AbstractDataPageIO io = PageIO.getPageIO(pageAddr); io.setFreeListPageId(pageAddr, freeListPage); }