ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
Date Wed, 22 Jul 2015 04:12:28 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-104 9dec3b7b1 -> 2ac79893c


IGNITE-104 - Ordered ATOMIC updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2ac79893
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2ac79893
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2ac79893

Branch: refs/heads/ignite-104
Commit: 2ac79893cc21517c73e85a1bcc18c33b59140fb2
Parents: 9dec3b7
Author: Valentin Kulichenko <vkulichenko@gridgain.com>
Authored: Tue Jul 21 21:12:23 2015 -0700
Committer: Valentin Kulichenko <vkulichenko@gridgain.com>
Committed: Tue Jul 21 21:12:23 2015 -0700

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 36 +++++++++++---------
 1 file changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2ac79893/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index bdaa994..aaf373d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -188,24 +188,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         processNearAtomicUpdateRequest(nodeId, req);
                     }
                 });
-            }
-        }
-        else {
-            ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID,
GridNearAtomicUpdateRequest>() {
-                @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req)
{
-                    processNearAtomicUpdateRequest(nodeId, req);
-                }
-            });
-        }
 
-        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID,
GridNearAtomicUpdateResponse>() {
-            @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
-                processNearAtomicUpdateResponse(nodeId, res);
-            }
-        });
-
-        if (ctx.config().isAtomicOrderedUpdates()) {
-            for (int part = 0; part < ctx.affinity().partitions(); part++) {
                 ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part, false), new
CI2<UUID, GridDhtAtomicUpdateRequest>() {
                     @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req)
{
                         processDhtAtomicUpdateRequest(nodeId, req);
@@ -214,6 +197,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             }
         }
         else {
+            ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID,
GridNearAtomicUpdateRequest>() {
+                @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req)
{
+                    processNearAtomicUpdateRequest(nodeId, req);
+                }
+            });
+
             ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID,
GridDhtAtomicUpdateRequest>() {
                 @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req)
{
                     processDhtAtomicUpdateRequest(nodeId, req);
@@ -221,6 +210,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             });
         }
 
+        ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID,
GridNearAtomicUpdateResponse>() {
+            @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
+                processNearAtomicUpdateResponse(nodeId, res);
+            }
+        });
+
         ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID,
GridDhtAtomicUpdateResponse>() {
             @Override public void apply(UUID nodeId, GridDhtAtomicUpdateResponse res) {
                 processDhtAtomicUpdateResponse(nodeId, res);
@@ -247,6 +242,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     @Override public void stop() {
         for (DeferredResponseBuffer buf : pendingResponses.values())
             buf.finish();
+
+        if (ctx.config().isAtomicOrderedUpdates()) {
+            for (int part = 0; part < ctx.affinity().partitions(); part++) {
+                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part, true));
+                ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part, false));
+            }
+        }
     }
 
     /**


Mime
View raw message