ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/3] ignite git commit: IGNITE-7003: Ability to change WAL mode in runtime. This closes #3335.
Date Mon, 22 Jan 2018 19:10:53 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
new file mode 100644
index 0000000..0ac699f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -0,0 +1,914 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointFuture;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.GridTopic.TOPIC_WAL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+/**
+ * Write-ahead log state manager. Manages WAL enable and disable.
+ */
+public class WalStateManager extends GridCacheSharedManagerAdapter {
+    /** History size for to track stale messages. */
+    private static final int HIST_SIZE = 1000;
+
+    /** ID history for discovery messages. */
+    private final GridBoundedConcurrentLinkedHashSet<T2<UUID, Boolean>> discoMsgIdHist =
+        new GridBoundedConcurrentLinkedHashSet<>(HIST_SIZE);
+
+    /** History of already completed operations. */
+    private final GridBoundedConcurrentLinkedHashSet<UUID> completedOpIds =
+        new GridBoundedConcurrentLinkedHashSet<>(HIST_SIZE);
+
+    /** Client futures. */
+    private final Map<UUID, GridFutureAdapter<Boolean>> userFuts = new HashMap<>();
+
+    /** Finished results awaiting discovery finish message. */
+    private final Map<UUID, WalStateResult> ress = new HashMap<>();
+
+    /** Active distributed processes. */
+    private final Map<UUID, WalStateDistributedProcess> procs = new HashMap<>();
+
+    /** Pending results created on cache processor start based on available discovery data. */
+    private final Collection<WalStateResult> initialRess = new LinkedList<>();
+
+    /** Pending acknowledge messages (i.e. received before node completed it's local part). */
+    private final Collection<WalStateAckMessage> pendingAcks = new HashSet<>();
+
+    /** Whether this is a server node. */
+    private final boolean srv;
+
+    /** IO message listener. */
+    private final GridMessageListener ioLsnr;
+
+    /** Operation mutex. */
+    private final Object mux = new Object();
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Current coordinator node. */
+    private ClusterNode crdNode;
+
+    /** Disconnected flag. */
+    private boolean disconnected;
+
+    /**
+     * Constructor.
+     *
+     * @param kernalCtx Kernal context.
+     */
+    public WalStateManager(GridKernalContext kernalCtx) {
+        if (kernalCtx != null) {
+            IgniteConfiguration cfg = kernalCtx.config();
+
+            srv = !cfg.isClientMode() && !cfg.isDaemon();
+
+            log = kernalCtx.log(WalStateManager.class);
+        }
+        else {
+            srv = false;
+
+            log = null;
+        }
+
+        if (srv) {
+            ioLsnr = new GridMessageListener() {
+                @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
+                    if (msg instanceof WalStateAckMessage) {
+                        WalStateAckMessage msg0 = (WalStateAckMessage) msg;
+
+                        msg0.senderNodeId(nodeId);
+
+                        onAck(msg0);
+                    }
+                    else
+                        U.warn(log, "Unexpected IO message (will ignore): " + msg);
+                }
+            };
+        }
+        else
+            ioLsnr = null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        if (srv)
+            cctx.kernalContext().io().addMessageListener(TOPIC_WAL, ioLsnr);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0(boolean cancel) {
+        if (srv)
+            cctx.kernalContext().io().removeMessageListener(TOPIC_WAL, ioLsnr);
+    }
+
+    /**
+     * Callback invoked when caches info is collected inside cache processor start routine. Discovery is not
+     * active at this point.
+     */
+    public void onCachesInfoCollected() {
+        if (!srv)
+            return;
+
+        synchronized (mux) {
+            // Process top pending requests.
+            for (CacheGroupDescriptor grpDesc : cacheProcessor().cacheGroupDescriptors().values()) {
+                WalStateProposeMessage msg = grpDesc.nextWalChangeRequest();
+
+                if (msg != null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Processing WAL state message on start: " + msg);
+
+                    boolean enabled = grpDesc.walEnabled();
+
+                    WalStateResult res;
+
+                    if (F.eq(enabled, msg.enable()))
+                        res = new WalStateResult(msg, false);
+                    else {
+                        res = new WalStateResult(msg, true);
+
+                        grpDesc.walEnabled(!enabled);
+                    }
+
+                    initialRess.add(res);
+
+                    addResult(res);
+                }
+            }
+        }
+    }
+
+    /**
+     * Handle cache processor kernal start. At this point we already collected discovery data from other nodes
+     * (discovery already active), but exchange worker is not active yet. We need to iterate over available group
+     * descriptors and perform top operations, taking in count that no cache operations are possible at this point,
+     * so checkpoint is not needed.
+     */
+    public void onKernalStart() {
+        if (!srv)
+            return;
+
+        synchronized (mux) {
+            for (WalStateResult res : initialRess)
+                onCompletedLocally(res);
+
+            initialRess.clear();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture reconnectFut) {
+        Collection<GridFutureAdapter<Boolean>> userFuts0;
+
+        synchronized (mux) {
+            assert !disconnected;
+
+            disconnected = true;
+
+            userFuts0 = new ArrayList<>(userFuts.values());
+
+            userFuts.clear();
+        }
+
+        for (GridFutureAdapter<Boolean> userFut : userFuts0)
+            completeWithError(userFut, "Client node was disconnected from topology (operation result is unknown).");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onReconnected(boolean active) {
+        synchronized (mux) {
+            assert disconnected;
+
+            disconnected = false;
+        }
+    }
+
+    /**
+     * Initiate WAL mode change operation.
+     *
+     * @param cacheNames Cache names.
+     * @param enabled Enabled flag.
+     * @return Future completed when operation finished.
+     */
+    public IgniteInternalFuture<Boolean> init(Collection<String> cacheNames, boolean enabled) {
+        if (F.isEmpty(cacheNames))
+            return errorFuture("Cache names cannot be empty.");
+
+        synchronized (mux) {
+            if (disconnected)
+                return errorFuture("Failed to initiate WAL mode change because client node is disconnected.");
+
+            // Prepare cache and group infos.
+            Map<String, IgniteUuid> caches = new HashMap<>(cacheNames.size());
+
+            CacheGroupDescriptor grpDesc = null;
+
+            for (String cacheName : cacheNames) {
+                DynamicCacheDescriptor cacheDesc = cacheProcessor().cacheDescriptor(cacheName);
+
+                if (cacheDesc == null)
+                    return errorFuture("Cache doesn't exist: " + cacheName);
+
+                caches.put(cacheName, cacheDesc.deploymentId());
+
+                CacheGroupDescriptor curGrpDesc = cacheDesc.groupDescriptor();
+
+                if (grpDesc == null)
+                    grpDesc = curGrpDesc;
+                else if (!F.eq(grpDesc.deploymentId(), curGrpDesc.deploymentId())) {
+                    return errorFuture("Cannot change WAL mode for caches from different cache groups [" +
+                        "cache1=" + cacheNames.iterator().next() + ", grp1=" + grpDesc.groupName() +
+                        ", cache2=" + cacheName + ", grp2=" + curGrpDesc.groupName() + ']');
+                }
+            }
+
+            assert grpDesc != null;
+
+            HashSet<String> grpCaches = new HashSet<>(grpDesc.caches().keySet());
+
+            grpCaches.removeAll(cacheNames);
+
+            if (!grpCaches.isEmpty()) {
+                return errorFuture("Cannot change WAL mode because not all cache names belonging to the group are " +
+                    "provided [group=" + grpDesc.groupName() + ", missingCaches=" + grpCaches + ']');
+            }
+
+            if (grpDesc.config().getCacheMode() == CacheMode.LOCAL)
+                return errorFuture("WAL mode cannot be changed for LOCAL cache(s): " + cacheNames);
+
+            // WAL mode change makes sense only for persistent groups.
+            if (!grpDesc.persistenceEnabled())
+                return errorFuture("Cannot change WAL mode because persistence is not enabled for cache(s) [" +
+                    "caches=" + cacheNames + ", dataRegion=" + grpDesc.config().getDataRegionName() + ']');
+
+            // Send request.
+            final UUID opId = UUID.randomUUID();
+
+            GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
+
+            fut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                @Override public void apply(IgniteInternalFuture<Boolean> fut) {
+                    synchronized (mux) {
+                        userFuts.remove(opId);
+                    }
+                }
+            });
+
+            WalStateProposeMessage msg = new WalStateProposeMessage(opId, grpDesc.groupId(), grpDesc.deploymentId(),
+                cctx.localNodeId(), caches, enabled);
+
+            userFuts.put(opId, fut);
+
+            try {
+                cctx.discovery().sendCustomEvent(msg);
+
+                if (log.isDebugEnabled())
+                    log.debug("Initiated WAL state change operation: " + msg);
+            }
+            catch (Exception e) {
+                IgniteCheckedException e0 =
+                    new IgniteCheckedException("Failed to initiate WAL mode change due to unexpected exception.", e);
+
+                fut.onDone(e0);
+            }
+
+            return fut;
+        }
+    }
+
+    /**
+     * Handle propose message in discovery thread.
+     *
+     * @param msg Message.
+     */
+    public void onProposeDiscovery(WalStateProposeMessage msg) {
+        if (isDuplicate(msg))
+            return;
+
+        synchronized (mux) {
+            if (disconnected)
+                return;
+
+            // Validate current caches state before deciding whether to process message further.
+            if (validateProposeDiscovery(msg)) {
+                if (log.isDebugEnabled())
+                    log.debug("WAL state change message is valid (will continue processing): " + msg);
+
+                CacheGroupDescriptor grpDesc = cacheProcessor().cacheGroupDescriptors().get(msg.groupId());
+
+                assert grpDesc != null;
+
+                IgnitePredicate<ClusterNode> nodeFilter = grpDesc.config().getNodeFilter();
+
+                boolean affNode = srv && (nodeFilter == null || nodeFilter.apply(cctx.localNode()));
+
+                msg.affinityNode(affNode);
+
+                if (grpDesc.addWalChangeRequest(msg)) {
+                    msg.exchangeMessage(msg);
+
+                    if (log.isDebugEnabled())
+                        log.debug("WAL state change message will be processed in exchange thread: " + msg);
+                }
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("WAL state change message is added to pending set and will be processed later: " +
+                            msg);
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug("WAL state change message is invalid (will ignore): " + msg);
+            }
+        }
+    }
+
+    /**
+     * Validate propose message.
+     *
+     * @param msg Message.
+     * @return {@code True} if message should be processed further, {@code false} if no further processing is needed.
+     */
+    private boolean validateProposeDiscovery(WalStateProposeMessage msg) {
+        GridFutureAdapter<Boolean> userFut = userFuts.get(msg.operationId());
+
+        String errMsg = validate(msg);
+
+        if (errMsg != null) {
+            completeWithError(userFut, errMsg);
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Validate propose message.
+     *
+     * @param msg Message.
+     * @return Error message or {@code null} if everything is OK.
+     */
+    @Nullable private String validate(WalStateProposeMessage msg) {
+        // Is group still there?
+        CacheGroupDescriptor grpDesc = cacheProcessor().cacheGroupDescriptors().get(msg.groupId());
+
+        if (grpDesc == null)
+            return "Failed to change WAL mode because some caches no longer exist: " + msg.caches().keySet();
+
+        // Are specified caches still there?
+        for (Map.Entry<String, IgniteUuid> cache : msg.caches().entrySet()) {
+            String cacheName = cache.getKey();
+
+            DynamicCacheDescriptor cacheDesc = cacheProcessor().cacheDescriptor(cacheName);
+
+            if (cacheDesc == null || !F.eq(cacheDesc.deploymentId(), cache.getValue()))
+                return "Cache doesn't exist: " + cacheName;
+        }
+
+        // Are there any new caches in the group?
+        HashSet<String> grpCacheNames = new HashSet<>(grpDesc.caches().keySet());
+
+        grpCacheNames.removeAll(msg.caches().keySet());
+
+        if (!grpCacheNames.isEmpty()) {
+            return "Cannot change WAL mode because not all cache names belonging to the " +
+                "group are provided [group=" + grpDesc.groupName() + ", missingCaches=" + grpCacheNames + ']';
+        }
+
+        return null;
+    }
+
+    /**
+     * Handle propose message which is synchronized with other cache state actions through exchange thread.
+     * If operation is no-op (i.e. state is not changed), then no additional processing is needed, and coordinator will
+     * trigger finish request right away. Otherwise all nodes start asynchronous checkpoint flush, and send responses
+     * to coordinator. Once all responses are received, coordinator node will trigger finish message.
+     *
+     * @param msg Message.
+     */
+    public void onProposeExchange(WalStateProposeMessage msg) {
+        if (!srv)
+            return;
+
+        synchronized (mux) {
+            WalStateResult res = null;
+
+            if (msg.affinityNode()) {
+                // Affinity node, normal processing.
+                CacheGroupContext grpCtx = cacheProcessor().cacheGroup(msg.groupId());
+
+                if (grpCtx == null) {
+                    // Related caches were destroyed concurrently.
+                    res = new WalStateResult(msg, "Failed to change WAL mode because some caches " +
+                        "no longer exist: " + msg.caches().keySet());
+                }
+                else {
+                    if (F.eq(msg.enable(), grpCtx.walEnabled()))
+                        // Nothing changed -> no-op.
+                        res = new WalStateResult(msg, false);
+                    else {
+                        // Initiate a checkpoint.
+                        CheckpointFuture cpFut = triggerCheckpoint(msg.groupId());
+
+                        if (cpFut != null) {
+                            try {
+                                // Wait for checkpoint mark synchronously before releasing the control.
+                                cpFut.beginFuture().get();
+
+                                if (msg.enable()) {
+                                    grpCtx.walEnabled(true);
+
+                                    // Enable: it is enough to release cache operations once mark is finished because
+                                    // not-yet-flushed dirty pages have been logged.
+                                    WalStateChangeWorker worker = new WalStateChangeWorker(msg, cpFut);
+
+                                    new IgniteThread(worker).start();
+                                }
+                                else {
+                                    // Disable: not-yet-flushed operations are not logged, so wait for them
+                                    // synchronously in exchange thread. Otherwise, we cannot define a point in
+                                    // when it is safe to continue cache operations.
+                                    res = awaitCheckpoint(cpFut, msg);
+
+                                    // WAL state is persisted after checkpoint if finished. Otherwise in case of crash
+                                    // and restart we will think that WAL is enabled, but data might be corrupted.
+                                    grpCtx.walEnabled(false);
+                                }
+                            }
+                            catch (Exception e) {
+                                U.warn(log, "Failed to change WAL mode due to unexpected exception [" +
+                                    "msg=" + msg + ']', e);
+
+                                res = new WalStateResult(msg, "Failed to change WAL mode due to unexpected " +
+                                    "exception (see server logs for more information): " + e.getMessage());
+                            }
+                        }
+                        else {
+                            res = new WalStateResult(msg, "Failed to initiate a checkpoint (checkpoint thread " +
+                                "is not available).");
+                        }
+                    }
+                }
+            }
+            else {
+                // We cannot know result on non-affinity server node, so just complete operation with "false" flag,
+                // which will be ignored anyway.
+                res = new WalStateResult(msg, false);
+            }
+
+            if (res != null) {
+                addResult(res);
+
+                onCompletedLocally(res);
+            }
+        }
+    }
+
+    /**
+     * Handle local operation completion.
+     *
+     * @param res Result.
+     */
+    private void onCompletedLocally(WalStateResult res) {
+        assert res != null;
+
+        synchronized (mux) {
+            ClusterNode crdNode = coordinator();
+
+            UUID opId = res.message().operationId();
+
+            WalStateAckMessage msg = new WalStateAckMessage(opId, res.message().affinityNode(),
+                res.changed(), res.errorMessage());
+
+            // Handle distributed completion.
+            if (crdNode.isLocal()) {
+                Collection<ClusterNode> srvNodes = cctx.discovery().aliveServerNodes();
+
+                Collection<UUID> srvNodeIds = new ArrayList<>(srvNodes.size());
+
+                for (ClusterNode srvNode : srvNodes) {
+                    if (cctx.discovery().alive(srvNode))
+                        srvNodeIds.add(srvNode.id());
+                }
+
+                WalStateDistributedProcess proc = new WalStateDistributedProcess(res.message(), srvNodeIds);
+
+                procs.put(res.message().operationId(), proc);
+
+                unwindPendingAcks(proc);
+
+                proc.onNodeFinished(cctx.localNodeId(), msg);
+
+                sendFinishMessageIfNeeded(proc);
+            }
+            else {
+                // Just send message to coordinator.
+                try {
+                    cctx.kernalContext().io().sendToGridTopic(crdNode, TOPIC_WAL, msg, SYSTEM_POOL);
+                }
+                catch (IgniteCheckedException e) {
+                    U.warn(log, "Failed to send ack message to coordinator node [opId=" + opId +
+                        ", node=" + crdNode.id() + ']');
+                }
+            }
+        }
+    }
+
+    /**
+     * Unwind pending ack messages for the given distributed process.
+     *
+     * @param proc Process.
+     */
+    private void unwindPendingAcks(WalStateDistributedProcess proc) {
+        assert Thread.holdsLock(mux);
+
+        Iterator<WalStateAckMessage> iter = pendingAcks.iterator();
+
+        while (iter.hasNext()) {
+            WalStateAckMessage ackMsg = iter.next();
+
+            if (F.eq(proc.operationId(), ackMsg.operationId())) {
+                proc.onNodeFinished(ackMsg.senderNodeId(), ackMsg);
+
+                iter.remove();
+            }
+        }
+    }
+
+    /**
+     * Handle ack message.
+     *
+     * @param msg Ack message.
+     */
+    public void onAck(WalStateAckMessage msg) {
+        synchronized (mux) {
+            if (completedOpIds.contains(msg.operationId()))
+                // Skip stale messages.
+                return;
+
+            WalStateDistributedProcess proc = procs.get(msg.operationId());
+
+            if (proc == null)
+                // If process if not initialized yet, add to pending set.
+                pendingAcks.add(msg);
+            else {
+                // Notify process on node completion.
+                proc.onNodeFinished(msg.senderNodeId(), msg);
+
+                sendFinishMessageIfNeeded(proc);
+            }
+        }
+    }
+
+    /**
+     * Send finish message for the given distributed process if needed.
+     *
+     * @param proc Process.
+     */
+    private void sendFinishMessageIfNeeded(WalStateDistributedProcess proc) {
+        if (proc.completed())
+            sendFinishMessage(proc.prepareFinishMessage());
+    }
+
+    /**
+     * Send finish message.
+     *
+     * @param finishMsg Finish message.
+     */
+    private void sendFinishMessage(WalStateFinishMessage finishMsg) {
+        try {
+            cctx.discovery().sendCustomEvent(finishMsg);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to send WAL mode change finish message due to unexpected exception: " + finishMsg, e);
+        }
+    }
+
+    /**
+     * Handle finish message in discovery thread.
+     *
+     * @param msg Message.
+     */
+    public void onFinishDiscovery(WalStateFinishMessage msg) {
+        if (isDuplicate(msg))
+            return;
+
+        synchronized (mux) {
+            if (disconnected)
+                return;
+
+            // Complete user future, if any.
+            GridFutureAdapter<Boolean> userFut = userFuts.get(msg.operationId());
+
+            if (userFut != null) {
+                if (msg.errorMessage() != null)
+                    completeWithError(userFut, msg.errorMessage());
+                else
+                    complete(userFut, msg.changed());
+            }
+
+            // Clear pending data.
+            WalStateResult res = ress.remove(msg.operationId());
+
+            if (res == null && srv)
+                U.warn(log, "Received finish message for unknown operation (will ignore): " + msg.operationId());
+
+            procs.remove(msg.operationId());
+
+            CacheGroupDescriptor grpDesc = cacheProcessor().cacheGroupDescriptors().get(msg.groupId());
+
+            if (grpDesc != null && F.eq(grpDesc.deploymentId(), msg.groupDeploymentId())) {
+                // Toggle WAL mode in descriptor.
+                if (msg.changed())
+                    grpDesc.walEnabled(!grpDesc.walEnabled());
+
+                // Remove now-outdated message from the queue.
+                WalStateProposeMessage oldProposeMsg = grpDesc.nextWalChangeRequest();
+
+                assert oldProposeMsg != null;
+                assert F.eq(oldProposeMsg.operationId(), msg.operationId());
+
+                grpDesc.removeWalChangeRequest();
+
+                // Move next message to exchange thread.
+                WalStateProposeMessage nextProposeMsg = grpDesc.nextWalChangeRequest();
+
+                if (nextProposeMsg != null)
+                    msg.exchangeMessage(nextProposeMsg);
+            }
+
+            if (srv) {
+                // Remember operation ID to handle duplicates.
+                completedOpIds.add(msg.operationId());
+
+                // Remove possible stale messages.
+                Iterator<WalStateAckMessage> ackIter = pendingAcks.iterator();
+
+                while (ackIter.hasNext()) {
+                    WalStateAckMessage ackMsg = ackIter.next();
+
+                    if (F.eq(ackMsg.operationId(), msg.operationId()))
+                        ackIter.remove();
+                }
+            }
+        }
+    }
+
+    /**
+     * Handle node leave event.
+     *
+     * @param nodeId Node ID.
+     */
+    public void onNodeLeft(UUID nodeId) {
+        if (!srv)
+            return;
+
+        synchronized (mux) {
+            if (crdNode == null) {
+                assert ress.isEmpty();
+                assert procs.isEmpty();
+
+                return;
+            }
+
+            if (F.eq(crdNode.id(), nodeId)) {
+                // Coordinator exited, re-send to new, or initialize new distirbuted processes.
+                crdNode = null;
+
+                for (WalStateResult res : ress.values())
+                    onCompletedLocally(res);
+            }
+            else if (F.eq(cctx.localNodeId(), crdNode.id())) {
+                // Notify distributed processes on node leave.
+                for (Map.Entry<UUID, WalStateDistributedProcess> procEntry : procs.entrySet()) {
+                    WalStateDistributedProcess proc = procEntry.getValue();
+
+                    proc.onNodeLeft(nodeId);
+
+                    sendFinishMessageIfNeeded(proc);
+                }
+            }
+        }
+    }
+
+    /**
+     * Create future with error.
+     *
+     * @param errMsg Error message.
+     * @return Future.
+     */
+    @SuppressWarnings("Convert2Diamond")
+    private static IgniteInternalFuture<Boolean> errorFuture(String errMsg) {
+        return new GridFinishedFuture<Boolean>(new IgniteCheckedException(errMsg));
+    }
+
+    /**
+     * Complete user future with normal result.
+     *
+     * @param userFut User future.
+     * @param res Result.
+     */
+    private static void complete(@Nullable GridFutureAdapter<Boolean> userFut, boolean res) {
+        if (userFut != null)
+            userFut.onDone(res);
+    }
+
+    /**
+     * Complete user future with error.
+     *
+     * @param errMsg Error message.
+     */
+    private static void completeWithError(@Nullable GridFutureAdapter<Boolean> userFut, String errMsg) {
+        if (userFut != null)
+            userFut.onDone(new IgniteCheckedException(errMsg));
+    }
+
+    /**
+     * @return Cache processor.
+     */
+    private GridCacheProcessor cacheProcessor() {
+        return cctx.cache();
+    }
+
+    /**
+     * Get current coordinator node.
+     *
+     * @return Coordinator node.
+     */
+    private ClusterNode coordinator() {
+        assert Thread.holdsLock(mux);
+
+        if (crdNode != null)
+            return crdNode;
+        else {
+            ClusterNode res = null;
+
+            for (ClusterNode node : cctx.discovery().aliveServerNodes()) {
+                if (res == null || res.order() > node.order())
+                    res = node;
+            }
+
+            assert res != null;
+
+            crdNode = res;
+
+            return res;
+        }
+    }
+
+    /**
+     * Check if discovery message has already been received.
+     *
+     * @param msg Message.
+     * @return {@code True} if this is a duplicate.
+     */
+    private boolean isDuplicate(WalStateAbstractMessage msg) {
+        T2<UUID, Boolean> key;
+
+        if (msg instanceof WalStateProposeMessage)
+            key = new T2<>(msg.operationId(), true);
+        else {
+            assert msg instanceof WalStateFinishMessage;
+
+            key = new T2<>(msg.operationId(), false);
+        }
+
+        if (!discoMsgIdHist.add(key)) {
+            U.warn(log, "Received duplicate WAL mode change discovery message (will ignore): " + msg);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Add locally result to pending map.
+     *
+     * @param res Result.
+     */
+    private void addResult(WalStateResult res) {
+        ress.put(res.message().operationId(), res);
+    }
+
+    /**
+     * Force checkpoint.
+     *
+     * @param grpId Group ID.
+     * @return Checkpoint future or {@code null} if failed to get checkpointer.
+     */
+    @Nullable private CheckpointFuture triggerCheckpoint(int grpId) {
+        return cctx.database().forceCheckpoint("wal-state-change-grp-" + grpId);
+    }
+
+    /**
+     * Await for the checkpoint to finish.
+     *
+     * @param cpFut Checkpoint future.
+     * @param msg Orignial message which triggered the process.
+     * @return Result.
+     */
+    private WalStateResult awaitCheckpoint(CheckpointFuture cpFut, WalStateProposeMessage msg) {
+        WalStateResult res;
+
+        try {
+            assert msg.affinityNode();
+
+            if (cpFut != null)
+                cpFut.finishFuture().get();
+
+            res = new WalStateResult(msg, true);
+        }
+        catch (Exception e) {
+            U.warn(log, "Failed to change WAL mode due to unexpected exception [msg=" + msg + ']', e);
+
+            res = new WalStateResult(msg, "Failed to change WAL mode due to unexpected exception " +
+                "(see server logs for more information): " + e.getMessage());
+        }
+
+        return res;
+    }
+
+    /**
+     * WAL state change worker.
+     */
+    private class WalStateChangeWorker extends GridWorker {
+        /** Message. */
+        private final WalStateProposeMessage msg;
+
+        /** Checkpoint future. */
+        private final CheckpointFuture cpFut;
+
+        /**
+         * Constructor.
+         *
+         * @param msg Propose message.
+         */
+        private WalStateChangeWorker(WalStateProposeMessage msg, CheckpointFuture cpFut) {
+            super(cctx.igniteInstanceName(), "wal-state-change-worker-" + msg.groupId(), WalStateManager.this.log);
+
+            this.msg = msg;
+            this.cpFut = cpFut;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            WalStateResult res = awaitCheckpoint(cpFut, msg);
+
+            addResult(res);
+
+            onCompletedLocally(res);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java
new file mode 100644
index 0000000..3ac12fc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Exchange task to handle node leave for WAL state manager.
+ */
+public class WalStateNodeLeaveExchangeTask implements CachePartitionExchangeWorkerTask {
+    /** Node that has left the grid. */
+    private final ClusterNode node;
+
+    /**
+     * Constructor.
+     *
+     * @param node Node that has left the grid.
+     */
+    public WalStateNodeLeaveExchangeTask(ClusterNode node) {
+        assert node != null;
+
+        this.node = node;
+    }
+
+    /**
+     * @return Node that has left the grid.
+     */
+    public ClusterNode node() {
+        return node;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean skipForExchangeMerge() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(WalStateNodeLeaveExchangeTask.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java
new file mode 100644
index 0000000..747fd6a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateProposeMessage.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * WAL state propose message.
+ */
+public class WalStateProposeMessage extends WalStateAbstractMessage {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Node ID. */
+    private final UUID nodeId;
+
+    /** Cache names which are expected to be in the group along with their deployment IDs. */
+    private Map<String, IgniteUuid> caches;
+
+    /** Whether WAL should be enabled or disabled. */
+    private final boolean enable;
+
+    /** Whether message is being handled on cache affinity node. */
+    private transient boolean affNode;
+
+    /**
+     * Constructor.
+     *
+     * @param opId Operation IDs.
+     * @param grpId Expected group ID.
+     * @param grpDepId Expected group deployment ID.
+     * @param nodeId Node ID.
+     * @param caches Expected cache names and their relevant deployment IDs.
+     *
+     * @param enable WAL state flag.
+     */
+    public WalStateProposeMessage(UUID opId, int grpId, IgniteUuid grpDepId, UUID nodeId,
+        Map<String, IgniteUuid> caches, boolean enable) {
+        super(opId, grpId, grpDepId);
+
+        this.nodeId = nodeId;
+        this.caches = caches;
+        this.enable = enable;
+    }
+
+    /**
+     * @return Node ID.
+     */
+    public UUID nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * @return Caches.
+     */
+    public Map<String, IgniteUuid> caches() {
+        return caches;
+    }
+
+    /**
+     * @return WAL state flag.
+     */
+    public boolean enable() {
+        return enable;
+    }
+
+    /**
+     * @return Whether message is being handled on cache affintiy node.
+     */
+    public boolean affinityNode() {
+        return affNode;
+    }
+
+    /**
+     * @param affNode Whether message is being handled on cache affintiy node.
+     */
+    public void affinityNode(boolean affNode) {
+        this.affNode = affNode;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(WalStateProposeMessage.class, this, "super", super.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateResult.java
new file mode 100644
index 0000000..a2bd0af
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateResult.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Local WAL state change result.
+ */
+public class WalStateResult {
+    /** Original message. */
+    private final WalStateProposeMessage msg;
+
+    /** Whether mode was changed. */
+    private final boolean changed;
+
+    /** Error message (if any). */
+    private final String errMsg;
+
+    /**
+     * Constructor.
+     *
+     * @param msg Original message.
+     * @param changed Whether mode was changed.
+     */
+    public WalStateResult(WalStateProposeMessage msg, boolean changed) {
+        this(msg, changed, null);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param msg Original message.
+     * @param errMsg Error message (if any).
+     */
+    public WalStateResult(WalStateProposeMessage msg, String errMsg) {
+        this(msg, false, errMsg);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param msg Original message.
+     * @param changed Whether mode was changed.
+     * @param errMsg Error message (if any).
+     */
+    private WalStateResult(WalStateProposeMessage msg, boolean changed, String errMsg) {
+        this.msg = msg;
+        this.changed = changed;
+        this.errMsg = errMsg;
+    }
+
+    /**
+     * @return Original message.
+     */
+    public WalStateProposeMessage message() {
+        return msg;
+    }
+
+    /**
+     * @return Whether mode was changed.
+     */
+    public boolean changed() {
+        return changed;
+    }
+
+    /**
+     * @return Error message (if any).
+     */
+    @Nullable public String errorMessage() {
+        return errMsg;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(WalStateResult.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 5733039..7407f6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -563,7 +563,7 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                                             GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null;
 
-                                            if (!near() && cacheCtx.group().persistenceEnabled() &&
+                                            if (!near() && cacheCtx.group().persistenceEnabled() && cacheCtx.group().walEnabled() &&
                                                 op != NOOP && op != RELOAD && (op != READ || cctx.snapshot().needTxReadLogging())) {
                                                 if (dataEntries == null)
                                                     dataEntries = new ArrayList<>(entries.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index cd5a951..e1f1d6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -514,7 +514,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
      * @return {@code true} if cas succeeds.
      */
     private boolean casState(long state, GridDhtPartitionState toState) {
-        if (grp.persistenceEnabled()) {
+        if (grp.persistenceEnabled() && grp.walEnabled()) {
             synchronized (this) {
                 boolean update = this.state.compareAndSet(state, setPartState(state, toState));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index cf2c925..883cac6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext;
 import org.apache.ignite.internal.processors.cache.StateChangeRequest;
+import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
@@ -602,6 +603,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
                 else if (msg instanceof SnapshotDiscoveryMessage) {
                     exchange = onCustomMessageNoAffinityChange(crdNode);
                 }
+                else if (msg instanceof WalStateAbstractMessage)
+                    exchange = onCustomMessageNoAffinityChange(crdNode);
                 else {
                     assert affChangeMsg != null : this;
 
@@ -1073,6 +1076,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
             }
         }
 
+        changeWalModeIfNeeded();
+
         if (crd.isLocal()) {
             if (remaining.isEmpty())
                 onAllReceived(null);
@@ -1108,6 +1113,37 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte
     }
 
     /**
+     * Change WAL mode if needed.
+     */
+    private void changeWalModeIfNeeded() {
+        WalStateAbstractMessage msg = firstWalMessage();
+
+        if (msg != null)
+            cctx.walState().onProposeExchange(msg.exchangeMessage());
+    }
+
+    /**
+     * Get first message if and only if this is WAL message.
+     *
+     * @return WAL message or {@code null}.
+     */
+    @Nullable private WalStateAbstractMessage firstWalMessage() {
+        if (firstDiscoEvt != null && firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+            DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
+
+            if (customMsg instanceof WalStateAbstractMessage) {
+                WalStateAbstractMessage msg0 = (WalStateAbstractMessage)customMsg;
+
+                assert msg0.needExchange();
+
+                return msg0;
+            }
+        }
+
+        return null;
+    }
+
+    /**
      * The main purpose of this method is to wait for all ongoing updates (transactional and atomic), initiated on
      * the previous topology version, to finish to prevent inconsistencies during rebalancing and to prevent two
      * different simultaneous owners of the same lock.

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
new file mode 100644
index 0000000..1c77013
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CheckpointFuture.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+
+/**
+ * Checkpoint futures.
+ */
+public interface CheckpointFuture  {
+    /**
+     * @return Begin future.
+     */
+    public GridFutureAdapter beginFuture();
+
+    /**
+     * @return Finish future.
+     */
+    public GridFutureAdapter finishFuture();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 580fb3a..85c4005 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -267,6 +267,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** */
     private static final String MBEAN_GROUP = "Persistent Store";
 
+    /** WAL marker prefix for meta store. */
+    private static final String WAL_KEY_PREFIX = "grp-wal-disabled-";
+
+    /** WAL marker predicate for meta store. */
+    private static final IgnitePredicate<String> WAL_KEY_PREFIX_PRED = new IgnitePredicate<String>() {
+        @Override public boolean apply(String key) {
+            return key.startsWith(WAL_KEY_PREFIX);
+        }
+    };
+
     /** Checkpoint thread. Needs to be volatile because it is created in exchange worker. */
     private volatile Checkpointer checkpointer;
 
@@ -351,6 +361,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** */
     private List<MetastorageLifecycleListener> metastorageLifecycleLsnrs;
 
+    /** Initially disabled cache groups. */
+    public Collection<Integer> initiallyWalDisabledGrps;
+
     /**
      * @param ctx Kernal context.
      */
@@ -533,6 +546,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                 applyLastUpdates(status, true);
 
+                initiallyWalDisabledGrps = walDisabledGroups();
+
                 notifyMetastorageReadyForRead();
             }
             finally {
@@ -1527,6 +1542,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         fut2.get();
     }
 
+    /** {@inheritDoc} */
+    @Override public CheckpointFuture forceCheckpoint(String reason) {
+        Checkpointer cp = checkpointer;
+
+        if (cp == null)
+            return null;
+
+        return cp.wakeupForCheckpoint(0, reason);
+    }
+
     /**
      * Tries to search for a WAL pointer for the given partition counter start.
      *
@@ -1825,6 +1850,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         int applied = 0;
         WALPointer lastRead = null;
 
+        Collection<Integer> ignoreGrps = storeOnly ? Collections.emptySet() : initiallyWalDisabledGrps;
+
         try (WALIterator it = cctx.wal().replay(status.endPtr)) {
             while (it.hasNextX()) {
                 IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
@@ -1861,27 +1888,29 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                             if (storeOnly && grpId != METASTORAGE_CACHE_ID)
                                 continue;
 
-                            long pageId = pageRec.fullPageId().pageId();
+                            if (!ignoreGrps.contains(grpId)) {
+                                long pageId = pageRec.fullPageId().pageId();
 
-                            PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
+                                PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
 
-                            long page = pageMem.acquirePage(grpId, pageId, true);
-
-                            try {
-                                long pageAddr = pageMem.writeLock(grpId, pageId, page);
+                                long page = pageMem.acquirePage(grpId, pageId, true);
 
                                 try {
-                                    PageUtils.putBytes(pageAddr, 0, pageRec.pageData());
+                                    long pageAddr = pageMem.writeLock(grpId, pageId, page);
+
+                                    try {
+                                        PageUtils.putBytes(pageAddr, 0, pageRec.pageData());
+                                    }
+                                    finally {
+                                        pageMem.writeUnlock(grpId, pageId, page, null, true, true);
+                                    }
                                 }
                                 finally {
-                                    pageMem.writeUnlock(grpId, pageId, page, null, true, true);
+                                    pageMem.releasePage(grpId, pageId, page);
                                 }
-                            }
-                            finally {
-                                pageMem.releasePage(grpId, pageId, page);
-                            }
 
-                            applied++;
+                                applied++;
+                            }
                         }
 
                         break;
@@ -1895,15 +1924,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                             if (storeOnly && gId != METASTORAGE_CACHE_ID)
                                 continue;
 
-                            final int pId = destroyRec.partitionId();
+                            if (!ignoreGrps.contains(gId)) {
+                                final int pId = destroyRec.partitionId();
 
-                            PageMemoryEx pageMem = gId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(gId);
+                                PageMemoryEx pageMem = gId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(gId);
 
-                            pageMem.clearAsync(new P3<Integer, Long, Integer>() {
-                                @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) {
-                                    return cacheId == gId && PageIdUtils.partId(pageId) == pId;
-                                }
-                            }, true).get();
+                                pageMem.clearAsync(new P3<Integer, Long, Integer>() {
+                                    @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) {
+                                        return cacheId == gId && PageIdUtils.partId(pageId) == pId;
+                                    }
+                                }, true).get();
+
+                            }
                         }
 
                         break;
@@ -1917,29 +1949,31 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                             if (storeOnly && grpId != METASTORAGE_CACHE_ID)
                                 continue;
 
-                            long pageId = r.pageId();
-
-                            PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
+                            if (!ignoreGrps.contains(grpId)) {
+                                long pageId = r.pageId();
 
-                            // Here we do not require tag check because we may be applying memory changes after
-                            // several repetitive restarts and the same pages may have changed several times.
-                            long page = pageMem.acquirePage(grpId, pageId, true);
+                                PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
 
-                            try {
-                                long pageAddr = pageMem.writeLock(grpId, pageId, page);
+                                // Here we do not require tag check because we may be applying memory changes after
+                                // several repetitive restarts and the same pages may have changed several times.
+                                long page = pageMem.acquirePage(grpId, pageId, true);
 
                                 try {
-                                    r.applyDelta(pageMem, pageAddr);
+                                    long pageAddr = pageMem.writeLock(grpId, pageId, page);
+
+                                    try {
+                                        r.applyDelta(pageMem, pageAddr);
+                                    }
+                                    finally {
+                                        pageMem.writeUnlock(grpId, pageId, page, null, true, true);
+                                    }
                                 }
                                 finally {
-                                    pageMem.writeUnlock(grpId, pageId, page, null, true, true);
+                                    pageMem.releasePage(grpId, pageId, page);
                                 }
-                            }
-                            finally {
-                                pageMem.releasePage(grpId, pageId, page);
-                            }
 
-                            applied++;
+                                applied++;
+                            }
                         }
                 }
             }
@@ -2050,7 +2084,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         checkpointReadLock();
 
         try {
-            restorePartitionState(partStates);
+            restorePartitionState(partStates, Collections.emptySet());
         }
         finally {
             checkpointReadUnlock();
@@ -2073,6 +2107,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         long start = U.currentTimeMillis();
         int applied = 0;
 
+        Collection<Integer> ignoreGrps = metastoreOnly ? Collections.emptySet() : initiallyWalDisabledGrps;
+
         try (WALIterator it = cctx.wal().replay(status.startPtr)) {
             Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new HashMap<>();
 
@@ -2091,11 +2127,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                         for (DataEntry dataEntry : dataRec.writeEntries()) {
                             int cacheId = dataEntry.cacheId();
 
-                            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+                            int grpId = cctx.cache().cacheDescriptor(cacheId).groupId();
+
+                            if (!ignoreGrps.contains(grpId)) {
+                                GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
 
-                            applyUpdate(cacheCtx, dataEntry);
+                                applyUpdate(cacheCtx, dataEntry);
 
-                            applied++;
+                                applied++;
+                            }
                         }
 
                         break;
@@ -2106,8 +2146,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
 
                         PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
 
-                        partStates.put(new T2<>(metaStateRecord.groupId(), metaStateRecord.partitionId()),
-                            new T2<>((int)metaStateRecord.state(), metaStateRecord.updateCounter()));
+                        if (!ignoreGrps.contains(metaStateRecord.groupId())) {
+                            partStates.put(new T2<>(metaStateRecord.groupId(), metaStateRecord.partitionId()),
+                                new T2<>((int)metaStateRecord.state(), metaStateRecord.updateCounter()));
+                        }
 
                         break;
 
@@ -2152,7 +2194,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             }
 
             if (!metastoreOnly)
-                restorePartitionState(partStates);
+                restorePartitionState(partStates, ignoreGrps);
         }
         finally {
             if (!metastoreOnly)
@@ -2169,10 +2211,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
      * @throws IgniteCheckedException If failed to restore.
      */
     private void restorePartitionState(
-        Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
+        Map<T2<Integer, Integer>, T2<Integer, Long>> partStates,
+        Collection<Integer> ignoreGrps
     ) throws IgniteCheckedException {
         for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-            if (grp.isLocal() || !grp.affinityNode()) {
+            if (grp.isLocal() || !grp.affinityNode() || ignoreGrps.contains(grp.groupId())) {
                 // Local cache has no partitions and its states.
                 continue;
             }
@@ -2935,7 +2978,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                     snapFut = snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
 
                 for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
-                    if (grp.isLocal())
+                    if (grp.isLocal() || !grp.walEnabled())
                         continue;
 
                     List<GridDhtLocalPartition> locParts = new ArrayList<>();
@@ -3458,7 +3501,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /**
      *
      */
-    private static class CheckpointProgressSnapshot {
+    private static class CheckpointProgressSnapshot implements CheckpointFuture {
         /** */
         private final boolean started;
 
@@ -3474,6 +3517,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
             cpBeginFut = cpProgress.cpBeginFut;
             cpFinishFut = cpProgress.cpFinishFut;
         }
+
+        /** {@inheritDoc} */
+        @Override public GridFutureAdapter beginFuture() {
+            return cpBeginFut;
+        }
+
+        /** {@inheritDoc} */
+        @Override public GridFutureAdapter finishFuture() {
+            return cpFinishFut;
+        }
     }
 
     /**
@@ -4181,4 +4234,78 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     @Override public MetaStorage metaStorage() {
         return metaStorage;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean walEnabled(int grpId) {
+        return !initiallyWalDisabledGrps.contains(grpId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void walEnabled(int grpId, boolean enabled) {
+        String key = walGroupIdToKey(grpId);
+
+        checkpointReadLock();
+
+        try {
+            if (enabled)
+                metaStorage.remove(key);
+            else
+                metaStorage.write(key, true);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to write cache group WAL state [grpId=" + grpId +
+                ", enabled=" + enabled + ']', e);
+        }
+        finally {
+            checkpointReadUnlock();
+        }
+    }
+
+    /**
+     * @return List of initially WAL-disabled groups.
+     */
+    private Collection<Integer> walDisabledGroups() {
+        MetaStorage meta = cctx.database().metaStorage();
+
+        try {
+            Set<String> keys = meta.readForPredicate(WAL_KEY_PREFIX_PRED).keySet();
+
+            if (keys.isEmpty())
+                return Collections.emptySet();
+
+            HashSet<Integer> res = new HashSet<>(keys.size());
+
+            for (String key : keys) {
+                int grpId = walKeyToGroupId(key);
+
+                res.add(grpId);
+            }
+
+            return res;
+
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to read cache groups WAL state.", e);
+        }
+    }
+
+    /**
+     * Convert cache group ID to WAL state key.
+     *
+     * @param grpId Group ID.
+     * @return Key.
+     */
+    private static String walGroupIdToKey(int grpId) {
+        return WAL_KEY_PREFIX + grpId;
+    }
+
+    /**
+     * Convert WAL state key to cache group ID.
+     *
+     * @param key Key.
+     * @return Group ID.
+     */
+    private static int walKeyToGroupId(String key) {
+        return Integer.parseInt(key.substring(WAL_KEY_PREFIX.length()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index f8fd86c..4837f6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -559,7 +559,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
 
             int tag = pageMemory.invalidate(grp.groupId(), p);
 
-            ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), p));
+            if (grp.walEnabled())
+                ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), p));
 
             ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 8658c97..237dacc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -728,6 +728,15 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
     }
 
     /**
+     * Allows to wait checkpoint finished.
+     *
+     * @param reason Reason.
+     */
+    @Nullable public CheckpointFuture forceCheckpoint(String reason) {
+        return null;
+    }
+
+    /**
      * Waits until current state is checkpointed.
      *
      * @throws IgniteCheckedException If failed.
@@ -1002,4 +1011,22 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
     public MetaStorage metaStorage() {
         return null;
     }
+
+    /**
+     * @param grpId Group ID.
+     * @return WAL enabled flag.
+     */
+    public boolean walEnabled(int grpId) {
+        return false;
+    }
+
+    /**
+     * Marks cache group as with disabled WAL.
+     *
+     * @param grpId Group id.
+     * @param enabled flag.
+     */
+    public void walEnabled(int grpId, boolean enabled) {
+        // No-op.
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index c9b7278..26e46b2 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolde
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.AllocatedPageTracker;
 import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
@@ -234,7 +235,6 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
     /** {@inheritDoc} */
     @Override public void storeCacheData(StoredCacheData cacheData, boolean overwrite) throws IgniteCheckedException {
         File cacheWorkDir = cacheWorkDir(cacheData.config());
-
         File file;
 
         checkAndInitCacheWorkDir(cacheWorkDir);
@@ -738,6 +738,21 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
         return store;
     }
 
+    /** {@inheritDoc} */
+    @Override public void beforeCacheGroupStart(CacheGroupDescriptor grpDesc) {
+        if (grpDesc.persistenceEnabled() && !cctx.database().walEnabled(grpDesc.groupId())) {
+            File dir = cacheWorkDir(grpDesc.config());
+
+            assert dir.exists();
+
+            boolean res = IgniteUtils.delete(dir);
+
+            assert res;
+
+            grpDesc.walEnabled(false);
+        }
+    }
+
     /**
      * @param pageStoreFileIoFactory File IO factory to override default, may be used for blocked read-write.
      * @param pageStoreV1FileIoFactory File IO factory for reading V1 page store and for fast touching page files

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
index f0b2e16..d57b939 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -47,8 +48,10 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.SimpleDataPageIO;
 import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.jetbrains.annotations.NotNull;
@@ -66,6 +69,9 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
     /** */
     public static final int METASTORAGE_CACHE_ID = CU.cacheId(METASTORAGE_CACHE_NAME);
 
+    /** Marker for removed entry. */
+    private static final byte[] TOMBSTONE = new byte[0];
+
     /** */
     private final IgniteWriteAheadLogManager wal;
 
@@ -144,7 +150,63 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
         if (data != null)
             result = marshaller.unmarshal(data, getClass().getClassLoader());
 
-        return (Serializable) result;
+        return (Serializable)result;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Serializable> readForPredicate(IgnitePredicate<String> keyPred)
+        throws IgniteCheckedException {
+        Map<String, Serializable> res = null;
+
+        if (readOnly) {
+            if (empty)
+                return Collections.emptyMap();
+
+            if (lastUpdates != null) {
+                for (Map.Entry<String, byte[]> lastUpdate : lastUpdates.entrySet()) {
+                    if (keyPred.apply(lastUpdate.getKey())) {
+                        byte[] valBytes = lastUpdate.getValue();
+
+                        if (valBytes == TOMBSTONE)
+                            continue;
+
+                        if (res == null)
+                            res = new HashMap<>();
+
+                        Serializable val = marshaller.unmarshal(valBytes, getClass().getClassLoader());
+
+                        res.put(lastUpdate.getKey(), val);
+                    }
+                }
+            }
+        }
+
+        GridCursor<MetastorageDataRow> cur = tree.find(null, null);
+
+        while (cur.next()) {
+            MetastorageDataRow row = cur.get();
+
+            String key = row.key();
+            byte[] valBytes = row.value();
+
+            if (keyPred.apply(key)) {
+                // Either already added it, or this is a tombstone -> ignore.
+                if (lastUpdates != null && lastUpdates.containsKey(key))
+                    continue;
+
+                if (res == null)
+                    res = new HashMap<>();
+
+                Serializable val = marshaller.unmarshal(valBytes, getClass().getClassLoader());
+
+                res.put(key, val);
+            }
+        }
+
+        if (res == null)
+            res = Collections.emptyMap();
+
+        return res;
     }
 
     /** {@inheritDoc} */
@@ -187,10 +249,10 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
     public byte[] getData(String key) throws IgniteCheckedException {
         if (readOnly) {
             if (lastUpdates != null) {
-                byte[] lastUpdate = lastUpdates.get(key);
+                byte[] res = lastUpdates.get(key);
 
-                if (lastUpdate != null)
-                    return lastUpdate;
+                if (res != null)
+                    return res != TOMBSTONE ? res : null;
             }
 
             if (empty)
@@ -342,7 +404,7 @@ public class MetaStorage implements DbCheckpointListener, ReadOnlyMetastorage, R
             if (lastUpdates == null)
                 lastUpdates = new HashMap<>();
 
-            lastUpdates.put(key, value);
+            lastUpdates.put(key, value != null ? value : TOMBSTONE);
         }
         else {
             if (value != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java
index 9b73676..390c2e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/ReadOnlyMetastorage.java
@@ -17,7 +17,10 @@
 package org.apache.ignite.internal.processors.cache.persistence.metastorage;
 
 import java.io.Serializable;
+import java.util.Map;
+
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.lang.IgnitePredicate;
 
 /**
  *
@@ -25,4 +28,13 @@ import org.apache.ignite.IgniteCheckedException;
 public interface ReadOnlyMetastorage {
     /** */
     Serializable read(String key) throws IgniteCheckedException;
+
+    /**
+     * Read all keys matching provided predicate.
+     *
+     * @param keyPred Key predicate.
+     * @return Matched key-value pairs.
+     * @throws IgniteCheckedException If failed.
+     */
+    Map<String, Serializable> readForPredicate(IgnitePredicate<String> keyPred) throws IgniteCheckedException;
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index f392400..bcc8939 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -485,17 +485,18 @@ public class PageMemoryImpl implements PageMemoryEx {
                 if (PageIO.getType(pageAddr) == 0) {
                     trackingIO.initNewPage(pageAddr, pageId, pageSize());
 
-                    if (!ctx.wal().isAlwaysWriteFullPages())
-                        ctx.wal().log(
-                            new InitNewPageRecord(
-                                cacheId,
-                                pageId,
-                                trackingIO.getType(),
-                                trackingIO.getVersion(), pageId
-                            )
-                        );
-                    else
-                        ctx.wal().log(new PageSnapshot(fullId, absPtr + PAGE_OVERHEAD, pageSize()));
+                    if (!ctx.wal().disabled(fullId.groupId()))
+                        if (!ctx.wal().isAlwaysWriteFullPages())
+                            ctx.wal().log(
+                                new InitNewPageRecord(
+                                    cacheId,
+                                    pageId,
+                                    trackingIO.getType(),
+                                    trackingIO.getVersion(), pageId
+                                )
+                            );
+                        else
+                            ctx.wal().log(new PageSnapshot(fullId, absPtr + PAGE_OVERHEAD, pageSize()));
                 }
             }
 
@@ -1480,7 +1481,7 @@ public class PageMemoryImpl implements PageMemoryEx {
      *
      */
     void beforeReleaseWrite(FullPageId pageId, long ptr, boolean pageWalRec) {
-        if (walMgr != null && (pageWalRec || walMgr.isAlwaysWriteFullPages())) {
+        if (walMgr != null && (pageWalRec || walMgr.isAlwaysWriteFullPages()) && !walMgr.disabled(pageId.groupId())) {
             try {
                 walMgr.log(new PageSnapshot(pageId, ptr, pageSize()));
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
index 3316980..8f854cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/util/PageHandler.java
@@ -449,7 +449,7 @@ public abstract class PageHandler<X, R> {
         Boolean walPlc) {
         // If the page is clean, then it is either newly allocated or just after checkpoint.
         // In both cases we have to write full page contents to WAL.
-        return wal != null && !wal.isAlwaysWriteFullPages() && walPlc != TRUE &&
+        return wal != null && !wal.isAlwaysWriteFullPages() && walPlc != TRUE && !wal.disabled(cacheId) &&
             (walPlc == FALSE || pageMem.isDirty(cacheId, pageId, page));
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 444a207..7b3e938 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -79,6 +79,7 @@ import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
 import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
@@ -909,6 +910,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         return archiver0 != null && archiver0.reserved(fPtr.index());
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean disabled(int grpId) {
+        CacheGroupContext ctx = cctx.cache().cacheGroup(grpId);
+
+        return ctx != null && !ctx.walEnabled();
+    }
+
     /**
      * Lists files in archive directory and returns the index of last archived file.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index d30dddb..0c7bbb3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -202,7 +202,7 @@ public class IgniteWalIteratorFactory {
 
         return new GridCacheSharedContext<>(
             kernalCtx, null, null, null,
-            null, null, dbMgr, null,
+            null, null, null, dbMgr, null,
             null, null, null, null,
             null, null, null);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 2a0b176..8dec59a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
@@ -759,7 +760,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
                                             nearCached.innerReload();
                                     }
                                     else if (op == READ) {
-                                        if (cacheCtx.group().persistenceEnabled() && cctx.snapshot().needTxReadLogging()) {
+                                        CacheGroupContext grp = cacheCtx.group();
+
+                                        if (grp.persistenceEnabled() && grp.walEnabled() &&
+                                            cctx.snapshot().needTxReadLogging()) {
                                             ptr = cctx.wal().log(new DataRecord(new DataEntry(
                                                 cacheCtx.cacheId(),
                                                 txEntry.key(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/95d38643/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
index 344e6fe..a490cfc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java
@@ -194,6 +194,13 @@ public class ClientListenerProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @return Server port.
+     */
+    public int port() {
+        return srv.port();
+    }
+
+    /**
      * Prepare connector configuration.
      *
      * @param cfg Ignote configuration.


Mime
View raw message