ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [34/48] incubator-ignite git commit: # IGNITE-283: Fixed DR start issue.
Date Thu, 19 Feb 2015 18:43:56 GMT
# IGNITE-283: Fixed DR start issue.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/47fa3cef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/47fa3cef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/47fa3cef

Branch: refs/heads/ignite-292
Commit: 47fa3cefae42cca7ddc2b859208480a4122db319
Parents: c93d86f
Author: vozerov <vozerov@gridgain.com>
Authored: Thu Feb 19 14:55:33 2015 +0300
Committer: vozerov <vozerov@gridgain.com>
Committed: Thu Feb 19 14:55:33 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheContext.java      |  5 +++
 .../processors/cache/GridCacheProcessor.java    |  4 ++
 .../GridDistributedTxRemoteAdapter.java         | 11 +++--
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 44 ++++++++++----------
 4 files changed, 36 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/47fa3cef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 060a825..3ec013c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -320,7 +320,12 @@ public class GridCacheContext<K, V> implements Externalizable {
             expiryPlc = null;
 
         itHolder = new CacheWeakQueryIteratorsHolder(log);
+    }
 
+    /**
+     * Initialize conflict resolver after all managers are started.
+     */
+    void initConflictResolver() {
         // Conflict resolver is determined in two stages:
         // 1. If DR receiver hub is enabled, then pick it from DR manager.
         // 2. Otherwise instantiate default resolver in case local store is configured.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/47fa3cef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d038e91..e99c706 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -854,6 +854,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             for (GridCacheManager mgr : F.view(cacheCtx.managers(), F.notContains(dhtExcludes(cacheCtx))))
                 mgr.start(cacheCtx);
 
+            cacheCtx.initConflictResolver();
+
             if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg))
{
                 GridCacheContext<?, ?> dhtCtx = cacheCtx.near().dht().context();
 
@@ -861,6 +863,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 for (GridCacheManager mgr : dhtManagers(dhtCtx))
                     mgr.start(dhtCtx);
 
+                dhtCtx.initConflictResolver();
+
                 // Start DHT cache.
                 dhtCtx.cache().start();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/47fa3cef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 1ae5778..e36947e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -509,17 +509,16 @@ public class GridDistributedTxRemoteAdapter<K, V> extends IgniteTxAdapter<K,
V>
                                     if (txEntry.ttl() == CU.TTL_ZERO)
                                         op = DELETE;
 
-
                                     boolean drNeedResolve = cacheCtx.conflictNeedResolve();
 
-                                        if (drNeedResolve) {
-                                            IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K,
V>>
-                                                drRes = conflictResolve(op, txEntry.key(),
val, valBytes,
-                                                txEntry.ttl(), txEntry.conflictExpireTime(),
explicitVer, cached);
+                                    if (drNeedResolve) {
+                                        IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext<K,
V>>
+                                            drRes = conflictResolve(op, txEntry.key(), val,
valBytes,
+                                            txEntry.ttl(), txEntry.conflictExpireTime(),
explicitVer, cached);
 
                                         assert drRes != null;
 
-                                            GridCacheVersionConflictContext<K, V> drCtx
= drRes.get2();
+                                        GridCacheVersionConflictContext<K, V> drCtx
= drRes.get2();
 
                                         if (drCtx.isUseOld())
                                             op = NOOP;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/47fa3cef/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 14b8248..ef3de55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -85,13 +85,13 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
     /** Optional arguments for entry processor. */
     private Object[] invokeArgs;
 
-    /** DR put values. */
+    /** Conflict put values. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    private Collection<GridCacheDrInfo<V>> drPutVals;
+    private Collection<GridCacheDrInfo<V>> conflictPutVals;
 
-    /** DR remove values. */
+    /** Conflict remove values. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    private Collection<GridCacheVersion> drRmvVals;
+    private Collection<GridCacheVersion> conflictRmvVals;
 
     /** Mappings. */
     @GridToStringInclude
@@ -174,8 +174,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
      * @param keys Keys to update.
      * @param vals Values or transform closure.
      * @param invokeArgs Optional arguments for entry processor.
-     * @param drPutVals DR put values (optional).
-     * @param drRmvVals DR remove values (optional).
+     * @param conflictPutVals Conflict put values (optional).
+     * @param conflictRmvVals Conflict remove values (optional).
      * @param retval Return value require flag.
      * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
      * @param cached Cached entry if keys size is 1.
@@ -192,8 +192,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         Collection<? extends K> keys,
         @Nullable Collection<?> vals,
         @Nullable Object[] invokeArgs,
-        @Nullable Collection<GridCacheDrInfo<V>> drPutVals,
-        @Nullable Collection<GridCacheVersion> drRmvVals,
+        @Nullable Collection<GridCacheDrInfo<V>> conflictPutVals,
+        @Nullable Collection<GridCacheVersion> conflictRmvVals,
         final boolean retval,
         final boolean rawRetval,
         @Nullable GridCacheEntryEx<K, V> cached,
@@ -207,8 +207,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         this.rawRetval = rawRetval;
 
         assert vals == null || vals.size() == keys.size();
-        assert drPutVals == null || drPutVals.size() == keys.size();
-        assert drRmvVals == null || drRmvVals.size() == keys.size();
+        assert conflictPutVals == null || conflictPutVals.size() == keys.size();
+        assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
         assert cached == null || keys.size() == 1;
         assert subjId != null;
 
@@ -219,8 +219,8 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
         this.keys = keys;
         this.vals = vals;
         this.invokeArgs = invokeArgs;
-        this.drPutVals = drPutVals;
-        this.drRmvVals = drRmvVals;
+        this.conflictPutVals = conflictPutVals;
+        this.conflictRmvVals = conflictRmvVals;
         this.retval = retval;
         this.cached = cached;
         this.expiryPlc = expiryPlc;
@@ -530,19 +530,19 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                 drExpireTime = -1;
                 drVer = null;
             }
-            else if (drPutVals != null) {
-                GridCacheDrInfo<V> drPutVal =  F.first(drPutVals);
+            else if (conflictPutVals != null) {
+                GridCacheDrInfo<V> drPutVal =  F.first(conflictPutVals);
 
                 val = drPutVal.value();
                 drTtl = drPutVal.ttl();
                 drExpireTime = drPutVal.expireTime();
                 drVer = drPutVal.version();
             }
-            else if (drRmvVals != null) {
+            else if (conflictRmvVals != null) {
                 val = null;
                 drTtl = -1;
                 drExpireTime = -1;
-                drVer = F.first(drRmvVals);
+                drVer = F.first(conflictRmvVals);
             }
             else {
                 val = null;
@@ -616,13 +616,13 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
 
         Iterator<GridCacheDrInfo<V>> drPutValsIt = null;
 
-        if (drPutVals != null)
-            drPutValsIt = drPutVals.iterator();
+        if (conflictPutVals != null)
+            drPutValsIt = conflictPutVals.iterator();
 
         Iterator<GridCacheVersion> drRmvValsIt = null;
 
-        if (drRmvVals != null)
-            drRmvValsIt = drRmvVals.iterator();
+        if (conflictRmvVals != null)
+            drRmvValsIt = conflictRmvVals.iterator();
 
         Map<UUID, GridNearAtomicUpdateRequest<K, V>> pendingMappings = new HashMap<>(topNodes.size(),
1.0f);
 
@@ -661,7 +661,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                         throw err;
                     }
                 }
-                else if (drPutVals != null) {
+                else if (conflictPutVals != null) {
                     GridCacheDrInfo<V> drPutVal =  drPutValsIt.next();
 
                     val = drPutVal.value();
@@ -669,7 +669,7 @@ public class GridNearAtomicUpdateFuture<K, V> extends GridFutureAdapter<Object>
                     drExpireTime = drPutVal.expireTime();
                     drVer = drPutVal.version();
                 }
-                else if (drRmvVals != null) {
+                else if (conflictRmvVals != null) {
                     val = null;
                     drTtl = -1;
                     drExpireTime = -1;


Mime
View raw message