ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [16/49] ignite git commit: GG-12140 We will lose data if we cancel snapshot restore (cherry picked from commit 8e3ad6d)
Date Fri, 02 Jun 2017 17:13:29 GMT
GG-12140 We will lose data if we cancel snapshot restore
(cherry picked from commit 8e3ad6d)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b545fa9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b545fa9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b545fa9

Branch: refs/heads/ignite-5398
Commit: 7b545fa9029ba9f3d90828cd38611f6a2988cb25
Parents: d4c9997
Author: EdShangGG <eshangareev@gridgain.com>
Authored: Tue May 16 19:07:03 2017 +0300
Committer: EdShangGG <eshangareev@gridgain.com>
Committed: Thu May 18 16:15:35 2017 +0300

----------------------------------------------------------------------
 .../pagemem/snapshot/SnapshotOperation.java     |   4 +-
 .../pagemem/snapshot/SnapshotOperationType.java |   2 +
 ...artSnapshotOperationAckDiscoveryMessage.java |   8 ++
 .../GridCachePartitionExchangeManager.java      |   2 +-
 .../processors/cache/GridCacheProcessor.java    |  32 +++++-
 .../IgniteCacheDatabaseSharedManager.java       |   7 ++
 .../GridDhtPartitionsExchangeFuture.java        | 115 ++++---------------
 .../query/h2/database/H2TreeIndex.java          |   3 +-
 8 files changed, 75 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
index 93054ec..98f295c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
@@ -105,7 +105,9 @@ public class SnapshotOperation implements Serializable {
      * @param op Op.
      */
     public static Collection<File> getOptionalPathsParameter(SnapshotOperation op)
{
-        assert (op.type() == SnapshotOperationType.CHECK || op.type() == SnapshotOperationType.RESTORE)
+        assert (op.type() == SnapshotOperationType.CHECK ||
+                op.type() == SnapshotOperationType.RESTORE ||
+                op.type() == SnapshotOperationType.RESTORE_2_PHASE)
             && (op.extraParameter() == null || op.extraParameter() instanceof Collection);
 
         return (Collection<File>)op.extraParameter();

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperationType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperationType.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperationType.java
index 3fa6d2a..c3b3a2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperationType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperationType.java
@@ -23,6 +23,8 @@ public enum SnapshotOperationType {
     CREATE,
     /** Restore. */
     RESTORE,
+    /** Restore 2. */
+    RESTORE_2_PHASE,
     /** Move. */
     MOVE,
     /** Delete. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationAckDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationAckDiscoveryMessage.java
index 72defd4..af7648d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationAckDiscoveryMessage.java
@@ -81,6 +81,14 @@ public class StartSnapshotOperationAckDiscoveryMessage implements DiscoveryCusto
     /**
      *
      */
+    public boolean needExchange() {
+        /* exchange for trigger saving cluster state*/
+        return err == null && snapshotOperation.type() == SnapshotOperationType.CREATE;
+    }
+
+    /**
+     *
+     */
     public IgniteUuid operationId() {
         return opId;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 79166f2..0f6a656 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -282,7 +282,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(evt.eventNode(),
msg);
                     }
                     else if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage
-                        && !((StartSnapshotOperationAckDiscoveryMessage)customMsg).hasError())
{
+                        && ((StartSnapshotOperationAckDiscoveryMessage)customMsg).needExchange())
{
                         exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type());
 
                         exchFut = exchangeFuture(exchId, evt, null, null, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index bf7a4fd..b339bd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1963,6 +1963,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (!F.isEmpty(reqs) && err == null) {
             Collection<IgniteBiTuple<GridCacheContext, Boolean>> stopped = null;
 
+            boolean prepared = false;
+
             for (DynamicCacheChangeRequest req : reqs) {
                 String masked = maskNull(req.cacheName());
 
@@ -1970,6 +1972,12 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 boolean destroy = false;
 
                 if (req.stop()) {
+                    if (!prepared) {
+                        sharedCtx.database().prepareCachesStop();
+
+                        prepared = true;
+                    }
+
                     stopGateway(req);
 
                     sharedCtx.database().checkpointReadLock();
@@ -2610,6 +2618,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
      */
     public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames,
boolean checkThreadTx,
         boolean restart) {
+        return dynamicDestroyCaches(cacheNames, checkThreadTx, restart, true);
+    }
+
+    /**
+     * @param cacheNames Collection of cache names to destroy.
+     * @param checkThreadTx If {@code true} checks that current thread does not have active
transactions.
+     * @return Future that will be completed when cache is destroyed.
+     */
+    public IgniteInternalFuture<?> dynamicDestroyCaches(Collection<String> cacheNames,
boolean checkThreadTx,
+        boolean restart, boolean destroy) {
         if (checkThreadTx)
             checkEmptyTransactions();
 
@@ -2619,7 +2637,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(),
cacheName, ctx.localNodeId());
 
             t.stop(true);
-            t.destroy(true);
+            t.destroy(destroy);
 
             t.restart(restart);
 
@@ -2915,7 +2933,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             return sharedCtx.affinity().onCustomEvent(((CacheAffinityChangeMessage)msg));
 
         if (msg instanceof StartSnapshotOperationAckDiscoveryMessage &&
-            ((StartSnapshotOperationAckDiscoveryMessage)msg).error() == null)
+            ((StartSnapshotOperationAckDiscoveryMessage)msg).needExchange())
             return true;
 
         if (msg instanceof DynamicCacheChangeBatch)
@@ -3398,6 +3416,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param name Name.
+     */
+    public void restart(@Nullable String name) {
+        IgniteCacheProxy jcache = (IgniteCacheProxy) jCacheProxies.get(maskNull(name));
+
+        if (jcache != null)
+            jcache.restart();
+    }
+
+    /**
      * @param name Cache name.
      * @return Cache instance for given name.
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
index 6220c43..11d924e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java
@@ -196,6 +196,13 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
     }
 
     /**
+     * Needed action before any cache will stop
+     */
+    public void prepareCachesStop() {
+        // No-op.
+    }
+
+    /**
      * @param stoppedCtxs A collection of tuples (cache context, destroy flag).
      */
     public void onCachesStopped(Collection<IgniteBiTuple<GridCacheContext, Boolean>>
stoppedCtxs) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index fff1702..7a95193 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -51,9 +51,7 @@ import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
 import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation;
-import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperationType;
 import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
-import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
@@ -64,7 +62,6 @@ import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
-import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -571,31 +568,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     exchange = CU.clientNode(discoEvt.eventNode()) ?
                         onClientNodeEvent(crdNode) :
                         onServerNodeEvent(crdNode);
-
-                    StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = (StartSnapshotOperationAckDiscoveryMessage)msg;
-
-                    if (!cctx.localNode().isDaemon()) {
-                        SnapshotOperation op = snapshotOperationMsg.snapshotOperation();
-
-                        if (op.type() == SnapshotOperationType.RESTORE) {
-                            if (reqs != null)
-                                reqs = new ArrayList<>(reqs);
-                            else
-                                reqs = new ArrayList<>();
-
-                            List<DynamicCacheChangeRequest> destroyRequests = getStopCacheRequests(
-                                cctx.cache(), op.cacheNames(), cctx.localNodeId());
-
-                            reqs.addAll(destroyRequests);
-
-                            if (!reqs.isEmpty()) { //Emulate destroy cache request
-                                if (op.type() == SnapshotOperationType.RESTORE)
-                                    cctx.cache().onCustomEvent(new DynamicCacheChangeBatch(reqs),
topVer);
-
-                                onCacheChangeRequest(crdNode);
-                            }
-                        }
-                    }
                 }
                 else {
                     assert affChangeMsg != null : this;
@@ -645,6 +617,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                     assert false;
             }
 
+            if (cctx.localNode().isClient())
+                startLocalSnasphotOperation();
+
             exchLog.info("Finish exchange init [topVer=" + topVer + ", crd=" + crdNode +
']');
         }
         catch (IgniteInterruptedCheckedException e) {
@@ -663,36 +638,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /**
-     * @param cache Cache.
-     * @param cacheNames Cache names.
-     * @param locNodeId Local node id.
-     */
-    @NotNull public static List<DynamicCacheChangeRequest> getStopCacheRequests(GridCacheProcessor
cache,
-        Set<String> cacheNames, UUID locNodeId) {
-        List<DynamicCacheChangeRequest> destroyRequests = new ArrayList<>();
-
-        for (String cacheName : cacheNames) {
-            DynamicCacheDescriptor desc = cache.cacheDescriptor(CU.cacheId(cacheName));
-
-            if (desc == null)
-                continue;
-
-            DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(),
cacheName, locNodeId);
-
-            t.stop(true);
-            t.destroy(true);
-
-            t.deploymentId(desc.deploymentId());
-
-            t.restart(true);
-
-            destroyRequests.add(t);
-        }
-
-        return destroyRequests;
-    }
-
-    /**
      * @throws IgniteCheckedException If failed.
      */
     private void initTopologies() throws IgniteCheckedException {
@@ -930,18 +875,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         cctx.database().beforeExchange(this);
 
-        StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = getSnapshotOperationMessage();
-
-        // If it's a snapshot operation request, synchronously wait for backup start.
-        if (snapshotOperationMsg != null) {
-            if (!cctx.localNode().isClient() && !cctx.localNode().isDaemon()) {
-                SnapshotOperation op = snapshotOperationMsg.snapshotOperation();
-
-                if (op.type() != SnapshotOperationType.RESTORE)
-                    startLocalSnasphotOperation(snapshotOperationMsg);
-            }
-        }
-
         if (crd.isLocal()) {
             if (remaining.isEmpty())
                 onAllReceived();
@@ -952,16 +885,24 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         initDone();
     }
 
-    /**
-     * @param snapOpMsg Snapshot operation message.
-     */
-    private void startLocalSnasphotOperation(StartSnapshotOperationAckDiscoveryMessage snapOpMsg
-    ) throws IgniteCheckedException {
-        IgniteInternalFuture fut = cctx.database()
-            .startLocalSnapshotOperation(snapOpMsg.initiatorNodeId(), snapOpMsg.snapshotOperation());
+    /** */
+    private void startLocalSnasphotOperation() {
+        StartSnapshotOperationAckDiscoveryMessage snapOpMsg = getSnapshotOperationMessage();
+
+        if (snapOpMsg != null) {
+            SnapshotOperation op = snapOpMsg.snapshotOperation();
+
+            try {
+                IgniteInternalFuture fut = cctx.database()
+                    .startLocalSnapshotOperation(snapOpMsg.initiatorNodeId(), snapOpMsg.snapshotOperation());
 
-        if (fut != null)
-            fut.get();
+                if (fut != null)
+                    fut.get();
+            }
+            catch (IgniteCheckedException e) {
+                log.error("Error while starting snapshot operation", e);
+            }
+        }
     }
 
     /**
@@ -1306,6 +1247,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             cacheValidRes = m;
         }
 
+        startLocalSnasphotOperation();
+
         cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err);
 
         cctx.exchange().onExchangeDone(this, err);
@@ -1315,20 +1258,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
                 cctx.cache().completeStartFuture(req);
         }
 
-        StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = getSnapshotOperationMessage();
-
-        if (snapshotOperationMsg != null && !cctx.localNode().isClient() &&
!cctx.localNode().isDaemon()) {
-            SnapshotOperation op = snapshotOperationMsg.snapshotOperation();
-
-            if (op.type() == SnapshotOperationType.RESTORE)
-                try {
-                    startLocalSnasphotOperation(snapshotOperationMsg);
-                }
-                catch (IgniteCheckedException e) {
-                    log.error("Error while starting snapshot operation", e);
-                }
-        }
-
         if (exchangeOnChangeGlobalState && err == null)
             cctx.kernalContext().state().onExchangeDone();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b545fa9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index be5be0a..dcfdec9 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -371,7 +371,8 @@ public class H2TreeIndex extends GridH2IndexBase {
     @Override public void destroy() {
         try {
             if (cctx.affinityNode()) {
-                tree.destroy();
+                if (!cctx.kernalContext().cache().context().database().persistenceEnabled())
+                    tree.destroy();
 
                 cctx.offheap().dropRootPageForIndex(tree.getName());
             }


Mime
View raw message