ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [14/50] incubator-ignite git commit: # ignite-23
Date Fri, 29 May 2015 12:46:21 GMT
# ignite-23


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e4e54bad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e4e54bad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e4e54bad

Branch: refs/heads/ignite-929
Commit: e4e54bad1481a64e6b270231158d2d299441eb9d
Parents: ff17caf
Author: sboikov <sboikov@gridgain.com>
Authored: Mon May 25 10:00:46 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon May 25 11:12:32 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          |   8 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  10 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   6 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java | 112 ++++++-----
 .../colocated/GridDhtColocatedLockFuture.java   |   3 +
 ...niteCacheClientNodeChangingTopologyTest.java | 184 +++++++++++++++++--
 6 files changed, 258 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 7fe847a..c6a2bf7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1023,8 +1023,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
         IgniteCacheExpiryPolicy expiry = null;
 
-        boolean clientReq = false;
-
         try {
             // If batch store update is enabled, we need to lock all entries.
             // First, need to acquire locks on cache entries, then check filter.
@@ -1052,13 +1050,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         return;
                     }
 
-                    clientReq = CU.clientNode(node);
-
                     // Do not check topology version for CLOCK versioning since
                     // partition exchange will wait for near update future.
                     // Also do not check topology version if topology was locked on near
node by
                     // external transaction or explicit lock.
-                    if ((req.fastMap() && !clientReq) || req.topologyLocked() ||

+                    if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked()
||
                         !needRemap(req.topologyVersion(), topology().topologyVersion(), req.keys()))
{
                         boolean hasNear = ctx.discovery().cacheNearNode(node, name());
 
@@ -1161,7 +1157,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             }
         }
         catch (GridDhtInvalidPartitionException ignore) {
-            assert !req.fastMap() || clientReq : req;
+            assert !req.fastMap() || req.clientRequest() : req;
 
             if (log.isDebugEnabled())
                 log.debug("Caught invalid partition exception for cache entry (will remap
update request): " + req);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 40ab104..ff8454e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -86,6 +86,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     /** Future keys. */
     private Collection<KeyCacheObject> keys;
 
+    /** */
+    private boolean waitForExchange;
+
     /**
      * @param cctx Cache context.
      * @param completionCb Callback to invoke when future is completed.
@@ -113,6 +116,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
 
         keys = new ArrayList<>(updateReq.keys().size());
+
+        boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() &&
!updateReq.clientRequest());
+
+        waitForExchange = !topLocked;
     }
 
     /** {@inheritDoc} */
@@ -164,8 +171,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
     /** {@inheritDoc} */
     @Override public boolean waitForPartitionExchange() {
-        // Wait dht update futures in PRIMARY mode.
-        return cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+        return waitForExchange;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 82659ca..50c3d56 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -614,7 +614,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                 filter,
                 subjId,
                 taskNameHash,
-                skipStore);
+                skipStore,
+                cctx.kernalContext().clientNode());
 
             req.addUpdateEntry(cacheKey,
                 val,
@@ -755,7 +756,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
                             filter,
                             subjId,
                             taskNameHash,
-                            skipStore);
+                            skipStore,
+                            cctx.kernalContext().clientNode());
 
                         pendingMappings.put(nodeId, mapped);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index a96a666..86c5ab8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -132,6 +132,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
     /** Skip write-through to a persistent storage. */
     private boolean skipStore;
 
+    /** */
+    private boolean clientReq;
+
     /**
      * Empty constructor required by {@link Externalizable}.
      */
@@ -148,6 +151,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
      * @param fastMap Fast map scheme flag.
      * @param updateVer Update version set if fast map is performed.
      * @param topVer Topology version.
+     * @param topLocked Topology locked flag.
      * @param syncMode Synchronization mode.
      * @param op Cache update operation.
      * @param retval Return value required flag.
@@ -157,6 +161,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash code.
      * @param skipStore Skip write-through to a persistent storage.
+     * @param clientReq Client node request flag.
      */
     public GridNearAtomicUpdateRequest(
         int cacheId,
@@ -174,7 +179,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
         @Nullable CacheEntryPredicate[] filter,
         @Nullable UUID subjId,
         int taskNameHash,
-        boolean skipStore
+        boolean skipStore,
+        boolean clientReq
     ) {
         this.cacheId = cacheId;
         this.nodeId = nodeId;
@@ -193,6 +199,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;
+        this.clientReq = clientReq;
 
         keys = new ArrayList<>();
     }
@@ -266,6 +273,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
     }
 
     /**
+     * @return {@code True} if request sent from client node.
+     */
+    public boolean clientRequest() {
+        return clientReq;
+    }
+
+    /**
      * @return Cache write synchronization mode.
      */
     public CacheWriteSynchronizationMode writeSynchronizationMode() {
@@ -574,126 +588,132 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage
implements Gri
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+                if (!writer.writeBoolean("clientReq", clientReq))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage("conflictTtls", conflictTtls))
+                if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("conflictTtls", conflictTtls))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes,
MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+                if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes,
MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeBoolean("fastMap", fastMap))
+                if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+                if (!writer.writeBoolean("fastMap", fastMap))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeBoolean("hasPrimary", hasPrimary))
+                if (!writer.writeMessage("futVer", futVer))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeBoolean("hasPrimary", hasPrimary))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeBoolean("retval", retval))
+                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeBoolean("skipStore", skipStore))
+                if (!writer.writeBoolean("retval", retval))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("skipStore", skipStore))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal()
: -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal()
: -1))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeBoolean("topLocked", topLocked))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeBoolean("topLocked", topLocked))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeMessage("updateVer", updateVer))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 23:
+                if (!writer.writeMessage("updateVer", updateVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 24:
                 if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
                     return false;
 
@@ -716,7 +736,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
         switch (reader.state()) {
             case 3:
-                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+                clientReq = reader.readBoolean("clientReq");
 
                 if (!reader.isLastRead())
                     return false;
@@ -724,7 +744,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 4:
-                conflictTtls = reader.readMessage("conflictTtls");
+                conflictExpireTimes = reader.readMessage("conflictExpireTimes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -732,7 +752,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 5:
-                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+                conflictTtls = reader.readMessage("conflictTtls");
 
                 if (!reader.isLastRead())
                     return false;
@@ -740,7 +760,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 6:
-                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+                conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -748,7 +768,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 7:
-                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+                entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
                     return false;
@@ -756,7 +776,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 8:
-                fastMap = reader.readBoolean("fastMap");
+                expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -764,7 +784,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 9:
-                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG,
CacheEntryPredicate.class);
+                fastMap = reader.readBoolean("fastMap");
 
                 if (!reader.isLastRead())
                     return false;
@@ -772,7 +792,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 10:
-                futVer = reader.readMessage("futVer");
+                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG,
CacheEntryPredicate.class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -780,7 +800,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 11:
-                hasPrimary = reader.readBoolean("hasPrimary");
+                futVer = reader.readMessage("futVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -788,7 +808,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 12:
-                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR,
byte[].class);
+                hasPrimary = reader.readBoolean("hasPrimary");
 
                 if (!reader.isLastRead())
                     return false;
@@ -796,7 +816,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 13:
-                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR,
byte[].class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -804,6 +824,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 14:
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 15:
                 byte opOrd;
 
                 opOrd = reader.readByte("op");
@@ -815,7 +843,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 15:
+            case 16:
                 retval = reader.readBoolean("retval");
 
                 if (!reader.isLastRead())
@@ -823,7 +851,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 16:
+            case 17:
                 skipStore = reader.readBoolean("skipStore");
 
                 if (!reader.isLastRead())
@@ -831,7 +859,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 17:
+            case 18:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -839,7 +867,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 18:
+            case 19:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -851,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 19:
+            case 20:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -859,7 +887,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 topLocked = reader.readBoolean("topLocked");
 
                 if (!reader.isLastRead())
@@ -867,7 +895,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -875,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 22:
+            case 23:
                 updateVer = reader.readMessage("updateVer");
 
                 if (!reader.isLastRead())
@@ -883,7 +911,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 23:
+            case 24:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -903,7 +931,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements
Gri
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 24;
+        return 25;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 500495a..cc8f064 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -341,6 +341,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
                 else if (log.isDebugEnabled())
                     log.debug("Transaction was not marked rollback-only while locks were
not acquired: " + tx);
             }
+
+            for (KeyCacheObject key : GridDhtColocatedLockFuture.this.keys)
+                cctx.mvcc().removeExplicitLock(threadId, key, lockVer);
         }
 
         cctx.mvcc().recheckPendingLocks();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e4e54bad/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index c01ef6f..4603aaf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -24,8 +24,11 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.plugin.extensions.communication.*;
@@ -38,6 +41,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
 import org.apache.ignite.transactions.*;
+import org.eclipse.jetty.util.*;
 import org.jetbrains.annotations.*;
 
 import javax.cache.*;
@@ -96,21 +100,53 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
      * @throws Exception If failed.
      */
     public void testAtomicPutAllClockMode() throws Exception {
-        atomicPutAll(CLOCK);
+        atomicPut(CLOCK, true, null);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testAtomicPutAllPrimaryMode() throws Exception {
-        atomicPutAll(PRIMARY);
+        atomicPut(PRIMARY, true, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicPutAllNearEnabledClockMode() throws Exception {
+        atomicPut(CLOCK, true, new NearCacheConfiguration());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicPutAllNearEnabledPrimaryMode() throws Exception {
+        atomicPut(PRIMARY, true, new NearCacheConfiguration());
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicPutClockMode() throws Exception {
+        atomicPut(CLOCK, false, null);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicPutPrimaryMode() throws Exception {
+        atomicPut(PRIMARY, false, null);
     }
 
     /**
      * @param writeOrder Write order.
+     * @param putAll If {@code true} executes putAll.
+     * @param nearCfg Near cache configuration.
      * @throws Exception If failed.
      */
-    private void atomicPutAll(CacheAtomicWriteOrderMode writeOrder) throws Exception {
+    private void atomicPut(CacheAtomicWriteOrderMode writeOrder,
+        final boolean putAll,
+        @Nullable NearCacheConfiguration nearCfg) throws Exception {
         ccfg = new CacheConfiguration();
 
         ccfg.setCacheMode(PARTITIONED);
@@ -123,15 +159,21 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
         IgniteEx ignite0 = startGrid(0);
         IgniteEx ignite1 = startGrid(1);
 
+        ccfg.setNearConfiguration(nearCfg);
+
         client = true;
 
+        ccfg.setNearConfiguration(null);
+
         Ignite ignite2 = startGrid(2);
 
         assertTrue(ignite2.configuration().isClientMode());
 
         final Map<Integer, Integer> map = new HashMap<>();
 
-        for (int i = 0; i < 100; i++)
+        final int KEYS = putAll ? 100 : 1;
+
+        for (int i = 0; i < KEYS; i++)
             map.put(i, i);
 
         TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
@@ -148,7 +190,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
             @Override public Object call() throws Exception {
                 Thread.currentThread().setName("put-thread");
 
-                cache.putAll(map);
+                if (putAll)
+                    cache.putAll(map);
+                else
+                    cache.put(0, 0);
 
                 return null;
             }
@@ -172,7 +217,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         map.clear();
 
-        for (int i = 0; i < 100; i++)
+        for (int i = 0; i < KEYS; i++)
             map.put(i, i + 1);
 
         // Block messages requests for single node.
@@ -182,7 +227,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
             @Override public Object call() throws Exception {
                 Thread.currentThread().setName("put-thread");
 
-                cache.putAll(map);
+                if (putAll)
+                    cache.putAll(map);
+                else
+                    cache.put(0, 1);
 
                 return null;
             }
@@ -202,10 +250,13 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         checkData(map, cache, 4);
 
-        for (int i = 0; i < 100; i++)
+        for (int i = 0; i < KEYS; i++)
             map.put(i, i + 2);
 
-        cache.putAll(map);
+        if (putAll)
+            cache.putAll(map);
+        else
+            cache.put(0, 2);
 
         checkData(map, cache, 4);
     }
@@ -401,7 +452,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
     /**
      * @throws Exception If failed.
      */
-    public void _testLock() throws Exception {
+    public void testLock() throws Exception {
         lock(null);
     }
 
@@ -740,6 +791,72 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testLockFromClientBlocksExchange() throws Exception {
+        ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setBackups(1);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setRebalanceMode(SYNC);
+
+        startGrid(0);
+        startGrid(1);
+
+        client = true;
+
+        Ignite ignite2 = startGrid(2);
+
+        IgniteCache<Integer, Integer> cache = ignite2.cache(null);
+
+        Lock lock = cache.lock(0);
+
+        lock.lock();
+
+        IgniteInternalFuture<?> startFut = GridTestUtils.runAsync(new Callable<Object>()
{
+            @Override public Object call() throws Exception {
+                client = false;
+
+                startGrid(3);
+
+                return null;
+            }
+        });
+
+        U.sleep(2000);
+
+        assertFalse(startFut.isDone());
+
+        AffinityTopologyVersion ver = new AffinityTopologyVersion(4);
+
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        U.sleep(2000);
+
+        for (int i = 0; i < 3; i++) {
+            Ignite ignite = ignite(i);
+
+            IgniteInternalFuture<?> fut =
+                ((IgniteKernal)ignite).context().cache().context().exchange().affinityReadyFuture(ver);
+
+            assertNotNull(fut);
+
+            assertFalse(fut.isDone());
+
+            futs.add(fut);
+        }
+
+        lock.unlock();
+
+        for (IgniteInternalFuture<?> fut : futs)
+            fut.get(10_000);
+
+        startFut.get(10_000);
+    }
+
+    /**
      * @param map Expected data.
      * @param clientCache Client cache.
      * @param expNodes Expected nodes number.
@@ -764,11 +881,35 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
                     for (Map.Entry<Integer, Integer> e : map.entrySet()) {
                         Integer key = e.getKey();
 
+                        GridCacheVersion ver = null;
+
                         for (Ignite node : nodes) {
                             IgniteCache<Integer, Integer> cache = node.cache(null);
 
-                            if (aff.isPrimaryOrBackup(node.cluster().localNode(), key) ||
node == nearCacheNode)
+                            boolean affNode = aff.isPrimaryOrBackup(node.cluster().localNode(),
key);
+
+                            if (affNode || node == nearCacheNode) {
                                 assertEquals("Unexpected value for " + node.name(), e.getValue(),
cache.localPeek(key));
+
+                                GridCacheAdapter cache0 = ((IgniteKernal)node).internalCache(null);
+
+                                if (affNode && cache0.isNear())
+                                    cache0 = ((GridNearCacheAdapter)cache0).dht();
+
+                                GridCacheEntryEx entry = cache0.peekEx(key);
+
+                                assertNotNull("No entry [node=" + node.name() + ", key="
+ key + ']', entry);
+
+                                GridCacheVersion ver0 = entry instanceof GridNearCacheEntry
?
+                                    ((GridNearCacheEntry)entry).dhtVersion() : entry.version();
+
+                                assertNotNull("Null version [node=" + node.name() + ", key="
+ key + ']', ver0);
+
+                                if (ver == null)
+                                    ver = ver0;
+                                else
+                                    assertEquals(ver0, ver);
+                            }
                             else
                                 assertNull("Unexpected non-null value for " + node.name(),
cache.localPeek(key));
                         }
@@ -779,6 +920,9 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
                     return false;
                 }
+                catch (Exception e) {
+                    fail("Unexpected exception: " + e);
+                }
 
                 return true;
             }
@@ -860,6 +1004,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         final int THREADS = CLIENT_CNT * 3;
 
+        final ConcurrentHashSet<Integer> putKeys = new ConcurrentHashSet<>();
+
         try {
             GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
                 @Override public Object call() throws Exception {
@@ -887,8 +1033,11 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
                     while (!stop.get()) {
                         TreeMap<Integer, Integer> map = new TreeMap<>();
 
-                        for (int i = 0; i < 100; i++)
-                            map.put(rnd.nextInt(0, 1000), i);
+                        for (int i = 0; i < 100; i++) {
+                            Integer key = rnd.nextInt(0, 1000);
+
+                            map.put(key, key);
+                        }
 
                         try {
                             if (useTx) {
@@ -904,6 +1053,8 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
                             }
                             else
                                 cache.putAll(map);
+
+                            putKeys.addAll(map.keySet());
                         }
                         catch (CacheException | IgniteException e) {
                             log.info("Update failed, ignore: " + e);
@@ -1002,6 +1153,13 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
 
         for (IgniteInternalFuture<?> fut : futs)
             fut.get();
+
+        Map<Integer, Integer> map = new HashMap<>();
+
+        for (Integer key : putKeys)
+            map.put(key, key);
+
+        checkData(map, grid(SRV_CNT).cache(null), SRV_CNT + CLIENT_CNT);
     }
 
     /**



Mime
View raw message