Repository: ignite
Updated Branches:
refs/heads/ignite-2523-1-resp-dht [created] bb97c217c
Simplifying DHT request/response APIs.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9833dab5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9833dab5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9833dab5
Branch: refs/heads/ignite-2523-1-resp-dht
Commit: 9833dab563e6b33cc3856e5e089eaf6a22bde4d8
Parents: f163aba
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Thu Apr 28 14:45:37 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Thu Apr 28 14:45:37 2016 +0300
----------------------------------------------------------------------
.../dht/atomic/GridDhtAtomicCache.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 33 ++++++++++++--------
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 11 ++++---
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 33 +++++++++++++++-----
.../distributed/near/GridNearAtomicCache.java | 5 +--
5 files changed, 55 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9833dab5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 2c0417c..a093fa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -3066,7 +3066,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId,
req, res);
try {
- if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode()
== FULL_SYNC)
+ if (res.failedCount() > 0 || res.nearEvictedCount() > 0 || req.writeSynchronizationMode()
== FULL_SYNC)
ctx.io().send(nodeId, res, ctx.ioPolicy());
else {
// No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9833dab5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 82238e0..0043bf1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
@@ -43,7 +44,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -412,24 +412,31 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
* @param nodeId Backup node ID.
* @param updateRes Update response.
*/
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
if (log.isDebugEnabled())
log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes="
+ updateRes + ']');
- if (updateRes.error() != null)
- this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error());
+ if (updateRes.error() != null) {
+ List<KeyCacheObject> failed = new ArrayList<>(updateRes.failedCount());
- if (!F.isEmpty(updateRes.nearEvicted())) {
- for (KeyCacheObject key : updateRes.nearEvicted()) {
- GridDhtCacheEntry entry = nearReadersEntries.get(key);
+ for (int i = 0; i < updateRes.failedCount(); i++)
+ failed.add(updateRes.failed(i));
- try {
- entry.removeReader(nodeId, updateRes.messageId());
- }
- catch (GridCacheEntryRemovedException e) {
- if (log.isDebugEnabled())
- log.debug("Entry with evicted reader was removed [entry=" + entry
+ ", err=" + e + ']');
- }
+ this.updateRes.addFailedKeys(failed, updateRes.error());
+ }
+
+ for (int i = 0; i < updateRes.nearEvictedCount(); i++) {
+ KeyCacheObject key = updateRes.nearEvicted(i);
+
+ GridDhtCacheEntry entry = nearReadersEntries.get(key);
+
+ try {
+ entry.removeReader(nodeId, updateRes.messageId());
+ }
+ catch (GridCacheEntryRemovedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Entry with evicted reader was removed [entry=" + entry + ",
err=" + e + ']');
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9833dab5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 6b050b1..ed275ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
@@ -38,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -442,10 +442,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements
Grid
}
/**
- * @return Keys.
+ * Check whether the followin key exist.
+ *
+ * @param key Key.
+ * @return {@code True} if exist.
*/
- public Collection<KeyCacheObject> keys() {
- return keys;
+ public boolean hasKey(KeyCacheObject key) {
+ return F.contains(keys, key);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9833dab5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 383e515..7099a4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
@@ -113,10 +112,20 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
}
/**
- * @return Failed keys.
+ * @return Amount of failed keys.
*/
- public Collection<KeyCacheObject> failedKeys() {
- return failedKeys;
+ public int failedCount() {
+ return failedKeys != null ? failedKeys.size() : 0;
+ }
+
+ /**
+ * Return failed key.
+ *
+ * @param idx Index.
+ * @return Failed key.
+ */
+ public KeyCacheObject failed(int idx) {
+ return failedKeys.get(idx);
}
/**
@@ -138,10 +147,20 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
}
/**
- * @return Evicted readers.
+ * @return Amount of near evicted keys.
+ */
+ public int nearEvictedCount() {
+ return nearEvicted != null ? nearEvicted.size() : 0;
+ }
+
+ /**
+ * Return near evicted key.
+ *
+ * @param idx Index.
+ * @return Near evicted key.
*/
- public Collection<KeyCacheObject> nearEvicted() {
- return nearEvicted;
+ public KeyCacheObject nearEvicted(int idx) {
+ return nearEvicted.get(idx);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/9833dab5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 97be2eb..febf0b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.io.Externalizable;
import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -314,8 +313,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K,
V> {
assert ver != null;
- Collection<KeyCacheObject> backupKeys = req.keys();
-
boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor()
!= null;
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
@@ -334,7 +331,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K,
V> {
break;
}
- if (F.contains(backupKeys, key)) { // Reader became backup.
+ if (req.hasKey(key)) { // Reader became backup.
if (entry.markObsolete(ver))
removeEntry(entry);
|