ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject ignite git commit: Check whether deferred update response affects performance.
Date Wed, 25 Nov 2015 13:45:40 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-no-deferred-res 9f0d4988a -> 24cbb4964


Check whether deferred update response affects performance.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/24cbb496
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/24cbb496
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/24cbb496

Branch: refs/heads/ignite-no-deferred-res
Commit: 24cbb49642309b7d062070f243014e040ba7708b
Parents: 9f0d498
Author: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Authored: Wed Nov 25 16:45:27 2015 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Wed Nov 25 16:45:27 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 177 ++++++++++++-------
 1 file changed, 111 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/24cbb496/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 2269110..b837dd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -84,6 +84,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     @GridToStringInclude
     private final Map<UUID, GridDhtAtomicUpdateRequest> mappings;
 
+    /** */
+    private GridDhtAtomicUpdateRequest singleReq;
+
     /** Entries with readers. */
     private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
 
@@ -129,7 +132,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
 
         keys = new ArrayList<>(updateReq.keys().size());
-        mappings = U.newHashMap(updateReq.keys().size());
+        mappings = updateReq.keys().size() == 1 ? U.<UUID, GridDhtAtomicUpdateRequest>newHashMap(updateReq.keys().size())
: null;
 
         boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() &&
!updateReq.clientRequest());
 
@@ -161,22 +164,39 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
     private boolean registerResponse(UUID nodeId) {
         int resCnt0;
 
-        GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
+        Map<UUID, GridDhtAtomicUpdateRequest> map = mappings;
 
-        if (req != null) {
-            synchronized (this) {
-                if (req.onResponse()) {
-                    resCnt0 = resCnt;
+        if (map != null) {
+            GridDhtAtomicUpdateRequest req = map.get(nodeId);
+
+            if (req != null) {
+                synchronized (this) {
+                    if (req.onResponse()) {
+                        resCnt0 = resCnt;
 
-                    resCnt0 += 1;
+                        resCnt0 += 1;
 
-                    resCnt = resCnt0;
+                        resCnt = resCnt0;
+                    }
+                    else
+                        return false;
                 }
-                else
-                    return false;
+
+                if (resCnt0 == map.size())
+                    onDone();
+
+                return true;
             }
+        }
+        else {
+            boolean done = false;
 
-            if (resCnt0 == mappings.size())
+            synchronized (this) {
+                if (singleReq.onResponse())
+                    done = true;
+            }
+
+            if (done)
                 onDone();
 
             return true;
@@ -241,7 +261,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             UUID nodeId = node.id();
 
             if (!nodeId.equals(cctx.localNodeId())) {
-                GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+                GridDhtAtomicUpdateRequest updateReq = mappings == null ? singleReq : mappings.get(nodeId);
 
                 if (updateReq == null) {
                     updateReq = new GridDhtAtomicUpdateRequest(
@@ -258,7 +278,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                         cctx.deploymentEnabled(),
                         this.updateReq.keepBinary());
 
-                    mappings.put(nodeId, updateReq);
+                    if (mappings != null)
+                        mappings.put(nodeId, updateReq);
+                    else
+                        singleReq = updateReq;
                 }
 
                 updateReq.addWriteValue(entry.key(),
@@ -352,58 +375,58 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             if (cctx.config().getWriteSynchronizationMode() == FULL_SYNC)
                 cctx.mvcc().removeAtomicFuture(version());
 
-            if (err != null) {
-                if (!mappings.isEmpty()) {
-                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
-
-                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                        for (int i = 0; i < req.size(); i++) {
-                            KeyCacheObject key = req.key(i);
-
-                            if (!hndKeys.contains(key)) {
-                                updateRes.addFailedKey(key, err);
-
-                                cctx.continuousQueries().skipUpdateEvent(key, req.partitionId(i),
req.updateCounter(i),
-                                    updateReq.topologyVersion());
-
-                                hndKeys.add(key);
-
-                                if (hndKeys.size() == keys.size())
-                                    break exit;
-                            }
-                        }
-                    }
-                }
-                else
-                    for (KeyCacheObject key : keys)
-                        updateRes.addFailedKey(key, err);
-            }
-            else {
-                Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
-
-                exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
-                    for (int i = 0; i < req.size(); i++) {
-                        KeyCacheObject key = req.key(i);
-
-                        if (!hndKeys.contains(key)) {
-                            try {
-                                cctx.continuousQueries().onEntryUpdated(key, req.value(i),
req.localPreviousValue(i),
-                                    key.internal() || !cctx.userCache(), req.partitionId(i),
true, false,
-                                    req.updateCounter(i), updateReq.topologyVersion());
-                            }
-                            catch (IgniteCheckedException e) {
-                                U.warn(log, "Failed to send continuous query message. [key="
+ key + ", newVal="
-                                    + req.value(i) + ", err=" + e + "]");
-                            }
-
-                            hndKeys.add(key);
-
-                            if (hndKeys.size() == keys.size())
-                                break exit;
-                        }
-                    }
-                }
-            }
+//            if (err != null) {
+//                if (!mappings.isEmpty()) {
+//                    Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
+//
+//                    exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+//                        for (int i = 0; i < req.size(); i++) {
+//                            KeyCacheObject key = req.key(i);
+//
+//                            if (!hndKeys.contains(key)) {
+//                                updateRes.addFailedKey(key, err);
+//
+//                                cctx.continuousQueries().skipUpdateEvent(key, req.partitionId(i),
req.updateCounter(i),
+//                                    updateReq.topologyVersion());
+//
+//                                hndKeys.add(key);
+//
+//                                if (hndKeys.size() == keys.size())
+//                                    break exit;
+//                            }
+//                        }
+//                    }
+//                }
+//                else
+//                    for (KeyCacheObject key : keys)
+//                        updateRes.addFailedKey(key, err);
+//            }
+//            else {
+//                Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size());
+//
+//                exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+//                    for (int i = 0; i < req.size(); i++) {
+//                        KeyCacheObject key = req.key(i);
+//
+//                        if (!hndKeys.contains(key)) {
+//                            try {
+//                                cctx.continuousQueries().onEntryUpdated(key, req.value(i),
req.localPreviousValue(i),
+//                                    key.internal() || !cctx.userCache(), req.partitionId(i),
true, false,
+//                                    req.updateCounter(i), updateReq.topologyVersion());
+//                            }
+//                            catch (IgniteCheckedException e) {
+//                                U.warn(log, "Failed to send continuous query message. [key="
+ key + ", newVal="
+//                                    + req.value(i) + ", err=" + e + "]");
+//                            }
+//
+//                            hndKeys.add(key);
+//
+//                            if (hndKeys.size() == keys.size())
+//                                break exit;
+//                        }
+//                    }
+//                }
+//            }
 
             if (updateReq.writeSynchronizationMode() == FULL_SYNC)
                 completionCb.apply(updateReq, updateRes);
@@ -418,7 +441,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
      * Sends requests to remote nodes.
      */
     public void map() {
-        if (!mappings.isEmpty()) {
+        if (mappings != null && !mappings.isEmpty()) {
             for (GridDhtAtomicUpdateRequest req : mappings.values()) {
                 try {
                     if (log.isDebugEnabled())
@@ -440,6 +463,28 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                 }
             }
         }
+        else if (singleReq != null) {
+            GridDhtAtomicUpdateRequest req = singleReq;
+
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId()
+ ", req=" + req + ']');
+
+                cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+            }
+            catch (ClusterTopologyCheckedException ignored) {
+                U.warn(log, "Failed to send update request to backup node because it left
grid: " +
+                    req.nodeId());
+
+                registerResponse(req.nodeId());
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to send update request to backup node (did node leave
the grid?): "
+                    + req.nodeId(), e);
+
+                registerResponse(req.nodeId());
+            }
+        }
         else
             onDone();
 


Mime
View raw message