ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-4705
Date Mon, 27 Feb 2017 16:07:30 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-4705 fef18e15a -> 71a11ea84


ignite-4705


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/71a11ea8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/71a11ea8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/71a11ea8

Branch: refs/heads/ignite-4705
Commit: 71a11ea84f3bc7a316c1cca1c252923cbbd000ad
Parents: fef18e1
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Feb 27 19:07:23 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Feb 27 19:07:23 2017 +0300

----------------------------------------------------------------------
 .../GridDhtAtomicAbstractUpdateFuture.java      | 23 ++---
 .../dht/atomic/GridDhtAtomicCache.java          |  4 +-
 .../dht/atomic/GridDhtAtomicNearResponse.java   |  9 ++
 .../dht/atomic/GridDhtAtomicUpdateResponse.java | 93 ++++++--------------
 .../GridNearAtomicSingleUpdateFuture.java       | 17 +++-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 10 +++
 6 files changed, 73 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/71a11ea8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5bbbc11..e9b6b62 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -297,7 +297,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
 
     /** {@inheritDoc} */
     @Override public final boolean onNodeLeft(UUID nodeId) {
-        boolean res = registerResponse(nodeId, true);
+        boolean res = registerResponse(nodeId, true, null);
 
         if (res && msgLog.isDebugEnabled()) {
             msgLog.debug("DTH update fut, node left [futId=" + futId + ", writeVer=" + writeVer
+
@@ -312,7 +312,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
      * @param nodeErr Node error flag.
      * @return {@code True} if request found.
      */
-    private boolean registerResponse(UUID nodeId, boolean nodeErr) {
+    private boolean registerResponse(UUID nodeId, boolean nodeErr, UpdateErrors errors) {
         int resCnt0;
 
         GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId)
: null;
@@ -322,7 +322,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         if (req != null) {
             synchronized (this) {
                 if (req.onResponse()) {
-                    if (nodeErr && !repliedToNear)
+                    if (errors != null || (nodeErr && !repliedToNear))
                         needReplyToNear = repliedToNear = true;
 
                     resCnt0 = resCnt;
@@ -349,6 +349,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                     dhtNodes,
                     req.flags());
 
+                res.errors(errors);
+
                 res.failedNodeId(nodeId);
 
                 try {
@@ -457,7 +459,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
     }
 
     /**
-     * @param nearReplyInfo {@code True} if need add inforamtion for near node response.
+     * @param nearReplyInfo {@code True} if need add information for near node response.
      * @param dhtNodes DHT nodes.
      * @param ret Return value.
      */
@@ -486,24 +488,23 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
                         ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
                 }
 
-                registerResponse(req.nodeId(), true);
+                registerResponse(req.nodeId(), true, null);
             }
             catch (IgniteCheckedException ignored) {
                 U.error(msgLog, "Failed to send request [futId=" + futId +
                     ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
 
-                registerResponse(req.nodeId(), true);
+                registerResponse(req.nodeId(), true, null);
             }
         }
     }
 
     /**
      * @param nodeId Node ID.
-     * @param res Response.
+     * @param errors Response.
      */
-    public final void onDhtErrorResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
-        // TODO IGNITE-4705.
-        assert false;
+    public final void onDhtErrorResponse(UUID nodeId, UpdateErrors errors) {
+        registerResponse(nodeId, false, errors);
     }
 
     /**
@@ -515,7 +516,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
         if (log.isDebugEnabled())
             log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId
+ ']');
 
-        registerResponse(nodeId, false);
+        registerResponse(nodeId, false, null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/71a11ea8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index fcb17f2..97be512 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -3649,7 +3649,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             }
         }
 
-        if (res.error() != null) {
+        if (res.errors() != null) {
             GridDhtAtomicAbstractUpdateFuture updateFut =
                 (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
 
@@ -3659,7 +3659,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']');
                 }
 
-                updateFut.onDhtErrorResponse(nodeId, res);
+                updateFut.onDhtErrorResponse(nodeId, res.errors());
             }
             else {
                 U.warn(msgLog, "Failed to find DHT update future for update response [futId="
+ res.futureId() +

http://git-wip-us.apache.org/repos/asf/ignite/blob/71a11ea8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index 25a64f0..e494288 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_HAS_RESULT_MASK;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE;
@@ -101,6 +102,14 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
         this.flags = flags;
     }
 
+    @Nullable UpdateErrors errors() {
+        return errors;
+    }
+
+    void errors(UpdateErrors errors) {
+        this.errors = errors;
+    }
+
     /**
      * @return Failed node ID.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/71a11ea8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 6d5cef3..b1a46d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * DHT atomic cache backup update response.
@@ -51,17 +52,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
     /** Future version. */
     private long futId;
 
-    /** Failed keys. */
-    @GridToStringInclude
-    @GridDirectCollection(KeyCacheObject.class)
-    private List<KeyCacheObject> failedKeys;
-
-    /** Update error. */
-    @GridDirectTransient
-    private IgniteCheckedException err;
-
-    /** Serialized update error. */
-    private byte[] errBytes;
+    /** */
+    private UpdateErrors errors;
 
     /** Evicted readers. */
     @GridToStringInclude
@@ -91,6 +83,13 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
         this.addDepInfo = addDepInfo;
     }
 
+    /**
+     * @return Errors.
+     */
+    @Nullable UpdateErrors errors() {
+        return errors;
+    }
+
     /** {@inheritDoc} */
     @Override public int lookupIndex() {
         return CACHE_MSG_IDX;
@@ -109,37 +108,15 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
      * @param err Error.
      */
     public void onError(IgniteCheckedException err){
-        this.err = err;
+        if (errors == null)
+            errors = new UpdateErrors();
+
+        errors.onError(err);
     }
 
     /** {@inheritDoc} */
     @Override public IgniteCheckedException error() {
-        return err;
-    }
-
-    /**
-     * @return Failed keys.
-     */
-    public Collection<KeyCacheObject> failedKeys() {
-        return failedKeys;
-    }
-
-    /**
-     * Adds key to collection of failed keys.
-     *
-     * @param key Key to add.
-     * @param e Error cause.
-     */
-    public void addFailedKey(KeyCacheObject key, Throwable e) {
-        if (failedKeys == null)
-            failedKeys = new ArrayList<>();
-
-        failedKeys.add(key);
-
-        if (err == null)
-            err = new IgniteCheckedException("Failed to update keys on primary node.");
-
-        err.addSuppressed(e);
+        return errors != null ? errors.error() : null;
     }
 
     /**
@@ -179,12 +156,10 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        prepareMarshalCacheObjects(failedKeys, cctx);
-
         prepareMarshalCacheObjects(nearEvicted, cctx);
 
-        if (errBytes == null)
-            errBytes = U.marshal(ctx, err);
+        if (errors != null)
+            errors.prepareMarshal(this, cctx);
     }
 
     /** {@inheritDoc} */
@@ -193,12 +168,10 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
 
         GridCacheContext cctx = ctx.cacheContext(cacheId);
 
-        finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
-
         finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
 
-        if (errBytes != null && err == null)
-            err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+        if (errors != null)
+            errors.finishUnmarshal(this, cctx, ldr);
     }
 
     /** {@inheritDoc} */
@@ -227,30 +200,24 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
 
         switch (writer.state()) {
             case 3:
-                if (!writer.writeByteArray("errBytes", errBytes))
+                if (!writer.writeMessage("errors", errors))
                     return false;
 
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-            case 5:
                 if (!writer.writeLong("futId", futId))
                     return false;
 
                 writer.incrementState();
 
-            case 6:
+            case 5:
                 if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
-            case 7:
+            case 6:
                 if (!writer.writeInt("partId", partId))
                     return false;
 
@@ -273,7 +240,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
 
         switch (reader.state()) {
             case 3:
-                errBytes = reader.readByteArray("errBytes");
+                errors = reader.readMessage("errors");
 
                 if (!reader.isLastRead())
                     return false;
@@ -281,14 +248,6 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
                 reader.incrementState();
 
             case 4:
-                failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 5:
                 futId = reader.readLong("futId");
 
                 if (!reader.isLastRead())
@@ -296,7 +255,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 6:
+            case 5:
                 nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -304,7 +263,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
 
                 reader.incrementState();
 
-            case 7:
+            case 6:
                 partId = reader.readInt("partId");
 
                 if (!reader.isLastRead())
@@ -324,7 +283,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements
Gri
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 8;
+        return 7;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/71a11ea8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 6a83c6e..7a18328 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -250,6 +250,16 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 return;
         }
 
+        UpdateErrors errors = res.errors();
+
+        if (errors != null) {
+            assert errors.error() != null;
+
+            onDone(errors.error());
+
+            return;
+        }
+
         finishUpdateFuture(opRes0, err0, remapTopVer0);
     }
 
@@ -258,7 +268,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
     @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res,
boolean nodeErr) {
         GridNearAtomicAbstractUpdateRequest req;
 
-        AffinityTopologyVersion remapTopVer0 = null;
+        AffinityTopologyVersion remapTopVer0;
 
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
@@ -279,11 +289,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
 
                 assert remapTopVer == null : remapTopVer;
 
-                remapTopVer = remapTopVer0 = res.remapTopologyVersion();
+                remapTopVer = res.remapTopologyVersion();
             }
             else if (res.error() != null) {
-                // TODO IGNITE-4705: assert only 1 key?
                 if (res.failedKeys() != null) {
+                    assert res.failedKeys().size() == 1 : res.failedKeys();
+
                     if (err == null)
                         err = new CachePartialUpdateCheckedException(
                             "Failed to update keys (retry update if possible).");

http://git-wip-us.apache.org/repos/asf/ignite/blob/71a11ea8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 7e035d0..7ce4ebd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -381,6 +381,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
             }
         }
 
+        UpdateErrors errors = res.errors();
+
+        if (errors != null) {
+            assert errors.error() != null;
+
+            onDone(errors.error());
+
+            return;
+        }
+
         finishUpdateFuture(opRes0, err0, remapTopVer0);
     }
 


Mime
View raw message