ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [04/10] ignite git commit: ignite-4680 one-message-back
Date Fri, 17 Mar 2017 15:04:49 GMT
ignite-4680 one-message-back


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f33235db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f33235db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f33235db

Branch: refs/heads/ignite-4680-sb
Commit: f33235dbda1b3081468b2d00a5337d27f77aa930
Parents: d8fa6dd
Author: Konstantin Dudkov <kdudkov@ya.ru>
Authored: Thu Mar 16 13:10:06 2017 +0300
Committer: Konstantin Dudkov <kdudkov@ya.ru>
Committed: Thu Mar 16 13:10:06 2017 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  5 ++
 .../dht/atomic/GridDhtAtomicCache.java          | 10 ++-
 .../GridNearAtomicAbstractUpdateFuture.java     | 62 ++++----------
 .../GridNearAtomicAbstractUpdateRequest.java    | 19 +++++
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 18 ++--
 .../atomic/GridNearAtomicUpdateResponse.java    | 17 +---
 .../dht/atomic/NearAtomicResponseHelper.java    | 86 ++++++++++++++++++++
 7 files changed, 140 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 5fd2845..39c514b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearAtomicResponseHelper;
 import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
@@ -829,6 +830,10 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 ((GridNearAtomicFullUpdateRequest)msg.message()).stripeMap() : null;
 
             if (stripemap != null) {
+                GridNearAtomicFullUpdateRequest msg0 = ((GridNearAtomicFullUpdateRequest)msg.message());
+
+                msg0.responseHelper(new NearAtomicResponseHelper(stripemap.keySet()));
+
                 for (Integer stripe : stripemap.keySet()) {
                     stripedExecutor.execute(stripe, c);
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index aeb379a..05d85ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -212,8 +212,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         updateReplyClos = new UpdateReplyClosure() {
             @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
             @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse
res) {
-                if (req.writeSynchronizationMode() != FULL_ASYNC)
-                    sendNearUpdateReply(res.nodeId(), res);
+                if (req.writeSynchronizationMode() != FULL_ASYNC) {
+                    if (req.responseHelper() != null) {
+                        if (req.responseHelper().addResponse(res))
+                            sendNearUpdateReply(res.nodeId(), req.responseHelper().response());
+                    }
+                    else
+                        sendNearUpdateReply(res.nodeId(), res);
+                }
                 else {
                     if (res.remapTopologyVersion() != null)
                         // Remap keys on primary node in FULL_ASYNC mode.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 770999a..2d02795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -357,6 +356,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @return Response to notify about primary failure.
      */
     final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest
req) {
+//        assert req == null : req;
         assert req.nodeId() != null : req;
 
         if (msgLog.isDebugEnabled()) {
@@ -426,9 +426,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         private GridNearAtomicUpdateResponse res;
 
         /** */
-        private Map<Integer, GridNearAtomicUpdateResponse> resMap;
-
-        /** */
         @GridToStringInclude
         Set<UUID> dhtNodes;
 
@@ -437,7 +434,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         private Set<UUID> rcvd;
 
         /** */
-        private boolean hasDhtRes;
+        private boolean hasRes;
 
         /**
          * @param req Request.
@@ -522,24 +519,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         }
 
         /**
-         * @return {@code True} if all near results gathered.
-         */
-        private boolean hasAllNearResults() {
-            return res != null || (resMap != null && resMap.size() == req.stripes());
-        }
-
-        public String state() {
-            return resMap != null ? resMap.size() + "/" + req.stripes() : res != null ? "res"
: "no";
-        }
-
-        /**
          * @return {@code True} if all expected responses are received.
          */
         private boolean finished() {
             if (req.writeSynchronizationMode() == PRIMARY_SYNC)
-                return hasAllNearResults();
+                return hasRes;
 
-            return (dhtNodes != null && dhtNodes.isEmpty()) && hasDhtRes;
+            return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
         }
 
         /**
@@ -559,7 +545,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 return req;
             }
 
-            return hasAllNearResults() ? req : null;
+            return this.res == null ? req : null;
         }
 
         /**
@@ -576,7 +562,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             if (finished())
                 return null;
 
-            return !hasAllNearResults() ? req : null;
+            return this.res == null ? req : null;
         }
 
         /**
@@ -588,7 +574,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 return DhtLeftResult.NOT_DONE;
 
             if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
-                if (hasDhtRes)
+                if (hasRes)
                     return DhtLeftResult.DONE;
                 else
                     return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY
: DhtLeftResult.NOT_DONE;
@@ -609,7 +595,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
                 return false;
 
             if (res.hasResult())
-                hasDhtRes = true;
+                hasRes = true;
 
             if (dhtNodes == null) {
                 if (rcvd == null)
@@ -631,6 +617,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx)
{
             assert !finished() : this;
 
+            hasRes = true;
+
             boolean onRes = storeResponse(res);
 
             assert onRes;
@@ -654,7 +642,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @param cctx Context.
          */
         private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
-            assert F.isEmpty(dhtNodes) || req.initMappingLocally();
+            assert dhtNodes == null || req.initMappingLocally();
 
             Set<UUID> dhtNodes0 = dhtNodes;
 
@@ -691,22 +679,10 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          * @return {@code True} if current response was {@code null}.
          */
         private boolean storeResponse(GridNearAtomicUpdateResponse res) {
-            if (res.stripe() > -1) {
-                if (resMap == null)
-                    resMap = U.newHashMap(req.stripes());
+            if (this.res == null) {
+                this.res = res;
 
-                if (!resMap.containsKey(res.stripe())) {
-                    resMap.put(res.stripe(), res);
-                    return true;
-                }
-            }
-            else {
-                if (this.res == null) {
-                    this.res = res;
-//                    this.resMap = null;
-
-                    return true;
-                }
+                return true;
             }
 
             return false;
@@ -717,7 +693,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
          */
         void resetResponse() {
             res = null;
-            resMap = null;
         }
 
         /**
@@ -727,19 +702,12 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
             return res;
         }
 
-        /**
-         * @return Response.
-         */
-        @Nullable public Map<Integer, GridNearAtomicUpdateResponse> responses() {
-            return resMap;
-        }
-
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(PrimaryRequestState.class, this,
                 "primary", primaryId(),
                 "needPrimaryRes", req.needPrimaryResponse(),
-                "primaryRes", this.res != null,
+                "primaryRes", res != null,
                 "done", finished());
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index b549370..0748434 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -89,6 +89,10 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     @GridToStringExclude
     protected byte flags;
 
+    /** Response helper. */
+    @GridDirectTransient
+    private NearAtomicResponseHelper responseHelper;
+
     /**
      *
      */
@@ -419,6 +423,21 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
     }
 
     /**
+     * @return Response helper.
+     */
+    public NearAtomicResponseHelper responseHelper() {
+        return responseHelper;
+    }
+
+    /**
+     * @param responseHelper Response helper.
+     */
+    public void responseHelper(
+        NearAtomicResponseHelper responseHelper) {
+        this.responseHelper = responseHelper;
+    }
+
+    /**
      * @param idx Key index.
      * @return Key.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 9c4de66..8774d5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -371,7 +371,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         GridCacheReturn opRes0 = null;
         CachePartialUpdateCheckedException err0 = null;
 
-        boolean rcvAll = false;
+        boolean rcvAll;
 
         synchronized (mux) {
             if (futId == -1 || futId != res.futureId())
@@ -412,8 +412,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
                     if (msgLog.isDebugEnabled())
                         msgLog.debug("Processed near atomic update response " +
                             "[futId=" + res.futureId() + ", node=" + nodeId + ", stripe="
+ res.stripe() +
-                            ": " + resCnt + "/" + mappings.size() + (rcvAll ? " all done"
: "") +
-                            " " + reqState.state() + ']');
+                            ": " + resCnt + "/" + mappings.size() + (rcvAll ? " all done"
: "") + ']');
                 }
                 else
                     return;
@@ -472,18 +471,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         if (rcvAll && nearEnabled) {
             if (mappings != null) {
                 for (PrimaryRequestState reqState : mappings.values()) {
-                    if (reqState.responses() != null) {
-                        for (GridNearAtomicUpdateResponse res0 : reqState.responses().values())
{
-                            updateNear(reqState.req, res0);
-                        }
-                    }
-                    else {
-                        GridNearAtomicUpdateResponse res0 = reqState.response();
+                    GridNearAtomicUpdateResponse res0 = reqState.response();
 
-                        assert res0 != null : reqState;
+                    assert res0 != null : reqState;
 
-                        updateNear(reqState.req, res0);
-                    }
+                    updateNear(reqState.req, res0);
                 }
             }
             else if (!nodeErr)

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index f8b45d3..f0f954c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -97,6 +97,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements
Gr
     private int partId = -1;
 
     /** Stripe. */
+    @GridDirectTransient
     private int stripe = -1;
 
     /** */
@@ -548,12 +549,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements
Gr
 
                 writer.incrementState();
 
-            case 15:
-                if (!writer.writeInt("stripe", stripe))
-                    return false;
-
-                writer.incrementState();
-
         }
 
         return true;
@@ -666,14 +661,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements
Gr
 
                 reader.incrementState();
 
-            case 15:
-                stripe = reader.readInt("stripe");
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
         }
 
         return reader.afterMessageRead(GridNearAtomicUpdateResponse.class);
@@ -686,7 +673,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements
Gr
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 16;
+        return 15;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/f33235db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
new file mode 100644
index 0000000..9e35e8f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/NearAtomicResponseHelper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ *
+ */
+public class NearAtomicResponseHelper {
+
+    /** */
+    private GridNearAtomicUpdateResponse res;
+
+    /** */
+    private Set<Integer> stripes;
+
+    /**
+     * @param stripes Stripes collection.
+     */
+    public NearAtomicResponseHelper(Set<Integer> stripes) {
+        this.stripes = new HashSet<>(stripes);
+    }
+
+    /**
+     * @param res Response.
+     * @return {@code true} if all responses added.
+     */
+    public boolean addResponse(GridNearAtomicUpdateResponse res) {
+        synchronized (this) {
+            if (res.stripe() == -1) {
+                this.res = res;
+                this.res.stripe(-1);
+
+                return true;
+            }
+
+            if (stripes.remove(res.stripe())) {
+                if (this.res == null)
+                    this.res = res;
+                else {
+                    if (res.nearValuesIndexes() != null)
+                        for (int i = 0; i < res.nearValuesIndexes().size(); i++)
+                            this.res.addNearValue(
+                                res.nearValuesIndexes().get(i),
+                                res.nearValue(i),
+                                res.nearTtl(i),
+                                res.nearExpireTime(i)
+                            );
+
+                    if (res.failedKeys() != null)
+                        this.res.addFailedKeys(res.failedKeys(), null);
+
+                    if (res.skippedIndexes() != null)
+                        this.res.skippedIndexes().addAll(res.skippedIndexes());
+                }
+                return stripes.isEmpty();
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * @return Response.
+     */
+    public GridNearAtomicUpdateResponse response() {
+        return res;
+    }
+}


Mime
View raw message