ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [23/51] [abbrv] ignite git commit: ignite-2523 : Generalized usage of GridNearAtomicUpdateRequest/Response.
Date Thu, 25 Feb 2016 12:31:22 GMT
ignite-2523 : Generalized usage of GridNearAtomicUpdateRequest/Response.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cb5bdb3f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cb5bdb3f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cb5bdb3f

Branch: refs/heads/ignite-2523
Commit: cb5bdb3f19cefe14380ac169d49e6e86bde1899a
Parents: 3c8d02a
Author: Ilya Lantukh <ilantukh@gridgain.com>
Authored: Mon Feb 8 18:51:55 2016 +0300
Committer: Ilya Lantukh <ilantukh@gridgain.com>
Committed: Mon Feb 8 18:51:55 2016 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 45 +++++++++++---------
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |  8 ++--
 2 files changed, 30 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cb5bdb3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index b0504db..05205e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
 import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
@@ -139,7 +140,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
 
     /** Update reply closure. */
-    private CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse>
updateReplyClos;
+    private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
 
     /** Pending  */
     private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
@@ -192,9 +193,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             }
         });
 
-        updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse>()
{
+        updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>()
{
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-            @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicMultipleUpdateResponse
res) {
+            @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse
res) {
                 if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
                     assert req.writeSynchronizationMode() != FULL_ASYNC : req;
 
@@ -1311,7 +1312,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     public void updateAllAsyncInternal(
         final UUID nodeId,
         final GridNearAtomicUpdateRequest req,
-        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse>
completionCb
+        final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
     ) {
         IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
 
@@ -1336,10 +1337,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     public void updateAllAsyncInternal0(
         UUID nodeId,
         GridNearAtomicUpdateRequest req,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
     ) {
-        GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(ctx.cacheId(),
nodeId, req.futureVersion(),
-            ctx.deploymentEnabled());
+        GridNearAtomicUpdateResponse res;
+
+        if (req instanceof GridNearAtomicSingleUpdateRequest)
+            res = new GridNearAtomicSingleUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
+                ctx.deploymentEnabled());
+        else
+            res = new GridNearAtomicMultipleUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
+                ctx.deploymentEnabled());
 
         List<KeyCacheObject> keys = req.keys();
 
@@ -1424,7 +1431,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                             UpdateBatchResult updRes = updateWithBatch(node,
                                 hasNear,
                                 req,
-                                res,
+                                (GridNearAtomicMultipleUpdateResponse) res,
                                 locked,
                                 ver,
                                 dhtFut,
@@ -1563,7 +1570,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -1797,7 +1804,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                     if (intercept) {
                         CacheObject old = entry.innerGet(
-                             null,
+                            null,
                             /*read swap*/true,
                             /*read through*/ctx.loadPreviousValue(),
                             /*fail fast*/false,
@@ -1812,7 +1819,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                             req.keepBinary());
 
                         Object val = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(ctx,
entry.key(),
-                            old, req.keepBinary()),
+                                old, req.keepBinary()),
                             updated.value(ctx.cacheObjectContext(), false));
 
                         if (val == null)
@@ -1975,11 +1982,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         ClusterNode node,
         boolean hasNear,
         GridNearAtomicUpdateRequest req,
-        GridNearAtomicMultipleUpdateResponse res,
+        GridNearAtomicUpdateResponse res,
         List<GridDhtCacheEntry> locked,
         GridCacheVersion ver,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean replicate,
         String taskName,
         @Nullable IgniteCacheExpiryPolicy expiry,
@@ -2214,7 +2221,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         @Nullable Collection<KeyCacheObject> rmvKeys,
         @Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>>
entryProcessorMap,
         @Nullable GridDhtAtomicUpdateFuture dhtFut,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         final GridNearAtomicUpdateRequest req,
         final GridNearAtomicMultipleUpdateResponse res,
         boolean replicate,
@@ -2688,8 +2695,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     @Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
         GridCacheVersion writeVer,
         GridNearAtomicUpdateRequest updateReq,
-        GridNearAtomicMultipleUpdateResponse updateRes,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
+        GridNearAtomicUpdateResponse updateRes,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         boolean force
     ) {
         if (!force) {
@@ -2944,9 +2951,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @param nodeId Originating node ID.
      * @param res Near update response.
      */
-    private void sendNearUpdateReply(UUID nodeId, GridNearAtomicMultipleUpdateResponse res)
{
+    private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) {
         try {
-            ctx.io().send(nodeId, res, ctx.ioPolicy());
+            ctx.io().send(nodeId, (GridCacheMessage) res, ctx.ioPolicy());
         }
         catch (ClusterTopologyCheckedException ignored) {
             U.warn(log, "Failed to send near update reply to node because it left grid: "
+
@@ -3188,7 +3195,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                 respVers.add(ver);
 
-                if  (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE &&
guard.compareAndSet(false, true))
+                if (respVers.sizex() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE &&
guard.compareAndSet(false, true))
                     snd = true;
             }
             finally {

http://git-wip-us.apache.org/repos/asf/ignite/blob/cb5bdb3f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 68c639d..3a31700 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -77,7 +77,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
implement
 
     /** Completion callback. */
     @GridToStringExclude
-    private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse>
completionCb;
+    private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
 
     /** Mappings. */
     @GridToStringInclude
@@ -90,7 +90,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
implement
     private final GridNearAtomicUpdateRequest updateReq;
 
     /** Update response. */
-    private final GridNearAtomicMultipleUpdateResponse updateRes;
+    private final GridNearAtomicUpdateResponse updateRes;
 
     /** Future keys. */
     private final Collection<KeyCacheObject> keys;
@@ -110,10 +110,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
implement
      */
     public GridDhtAtomicUpdateFuture(
         GridCacheContext cctx,
-        CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
+        CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
         GridCacheVersion writeVer,
         GridNearAtomicUpdateRequest updateReq,
-        GridNearAtomicMultipleUpdateResponse updateRes
+        GridNearAtomicUpdateResponse updateRes
     ) {
         this.cctx = cctx;
         this.writeVer = writeVer;


Mime
View raw message