ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [2/3] ignite git commit: Refactored keys and vals collections.
Date Fri, 01 Apr 2016 06:55:01 GMT
Refactored keys and vals collections.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f3b4fa26
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f3b4fa26
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f3b4fa26

Branch: refs/heads/ignite-2926
Commit: f3b4fa26319796baf4b94d7166ac49d262b3cf6d
Parents: 2776cca
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Thu Mar 31 17:08:46 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Thu Mar 31 17:08:46 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAtomicFuture.java |   5 -
 .../dht/atomic/GridDhtAtomicCache.java          |  12 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   5 -
 .../GridNearAtomicSingleUpdateFuture.java       | 174 +++++++------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   5 -
 5 files changed, 71 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f3b4fa26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 359909e..c96d00f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -38,9 +38,4 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R>
{
      * @return Future or {@code null} if no need to wait.
      */
     public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer);
-
-    /**
-     * @return Future keys.
-     */
-    public Collection<?> keys();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f3b4fa26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 00680ec..3ae43db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1063,19 +1063,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         boolean waitTopFut
     ) {
         GridCacheOperation op;
-        Collection vals;
+        Object updVal;
 
         if (val != null) {
             op = UPDATE;
-            vals = Collections.singletonList(val);
+            updVal = val;
         }
         else if (proc != null) {
             op = TRANSFORM;
-            vals = Collections.singletonList(proc);
+            updVal = proc;
         }
         else {
             op = DELETE;
-            vals = null;
+            updVal = null;
         }
 
         CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -1085,8 +1085,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             this,
             ctx.config().getWriteSynchronizationMode(),
             op,
-            Collections.singletonList(key),
-            vals,
+            key,
+            updVal,
             invokeArgs,
             retval,
             opCtx != null ? opCtx.expiry() : null,

http://git-wip-us.apache.org/repos/asf/ignite/blob/f3b4fa26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 9f52658..4721d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -212,11 +212,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
         return null;
     }
 
-    /** {@inheritDoc} */
-    @Override public Collection<KeyCacheObject> keys() {
-        return keys;
-    }
-
     /**
      * @param entry Entry to map.
      * @param val Value to write.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f3b4fa26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 4112308..8bb6ebe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -55,7 +55,6 @@ import javax.cache.expiry.ExpiryPolicy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -64,6 +63,7 @@ import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
 import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
 
 /**
@@ -87,11 +87,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
     private final GridCacheOperation op;
 
     /** Keys */
-    private Collection<?> keys;
+    private Object key;
 
     /** Values. */
     @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
-    private Collection<?> vals;
+    private Object val;
 
     /** Optional arguments for entry processor. */
     private Object[] invokeArgs;
@@ -143,8 +143,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
      * @param cache Cache instance.
      * @param syncMode Write synchronization mode.
      * @param op Update operation.
-     * @param keys Keys to update.
-     * @param vals Values or transform closure.
+     * @param key Keys to update.
+     * @param val Values or transform closure.
      * @param invokeArgs Optional arguments for entry processor.
      * @param retval Return value require flag.
      * @param expiryPlc Expiry policy explicitly specified for cache operation.
@@ -161,8 +161,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
         GridDhtAtomicCache cache,
         CacheWriteSynchronizationMode syncMode,
         GridCacheOperation op,
-        Collection<?> keys,
-        @Nullable Collection<?> vals,
+        Object key,
+        @Nullable Object val,
         @Nullable Object[] invokeArgs,
         final boolean retval,
         @Nullable ExpiryPolicy expiryPlc,
@@ -174,15 +174,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
         int remapCnt,
         boolean waitTopFut
     ) {
-        assert vals == null || vals.size() == keys.size();
         assert subjId != null;
 
         this.cctx = cctx;
         this.cache = cache;
         this.syncMode = syncMode;
         this.op = op;
-        this.keys = keys;
-        this.vals = vals;
+        this.key = key;
+        this.val = val;
         this.invokeArgs = invokeArgs;
         this.retval = retval;
         this.expiryPlc = expiryPlc;
@@ -229,11 +228,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<?> keys() {
-        return keys;
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         state.onNodeLeft(nodeId);
 
@@ -818,8 +812,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
             GridNearAtomicUpdateRequest singleReq0 = null;
             Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
 
-            int size = keys.size();
-
             GridCacheVersion futVer = cctx.versions().next(topVer);
 
             GridCacheVersion updVer;
@@ -839,7 +831,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
                 updVer = null;
 
             try {
-                if (size == 1 && !fastMap) {
+                if (!fastMap) {
                     assert remapKeys == null || remapKeys.size() == 1;
 
                     singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
@@ -865,7 +857,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
                         else
                             mappings0 = pendingMappings;
 
-                        assert !mappings0.isEmpty() || size == 0 : GridNearAtomicSingleUpdateFuture.this;
+                        assert !mappings0.isEmpty() : GridNearAtomicSingleUpdateFuture.this;
                     }
                 }
 
@@ -909,10 +901,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
             else {
                 assert mappings0 != null;
 
-                if (size == 0)
-                    onDone(new GridCacheReturn(cctx, true, true, null, true));
-                else
-                    doUpdate(mappings0);
+                doUpdate(mappings0);
             }
         }
 
@@ -972,93 +961,68 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
             GridCacheVersion futVer,
             @Nullable GridCacheVersion updVer,
             @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
-            Iterator<?> it = null;
-
-            if (vals != null)
-                it = vals.iterator();
-
             Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
 
-            // Create mappings first, then send messages.
-            for (Object key : keys) {
-                if (key == null)
-                    throw new NullPointerException("Null key.");
-
-                Object val;
-
-                if (vals != null) {
-                    val = it.next();
-
-                    if (val == null)
-                        throw new NullPointerException("Null value.");
-                }
-                else {
-                    val = null;
-                }
-
-                if (val == null && op != GridCacheOperation.DELETE)
-                    continue;
-
+            if (val != null || op == DELETE) {
                 KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
 
-                if (remapKeys != null && !remapKeys.contains(cacheKey))
-                    continue;
-
-                if (op != TRANSFORM)
-                    val = cctx.toCacheObject(val);
-
-                Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
-
-                if (affNodes.isEmpty())
-                    throw new ClusterTopologyServerNotFoundException("Failed to map keys
for cache " +
-                        "(all partition nodes left the grid).");
+                if (remapKeys == null || remapKeys.contains(cacheKey)) {
+                    if (op != TRANSFORM)
+                        val = cctx.toCacheObject(val);
 
-                int i = 0;
+                    Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
 
-                for (ClusterNode affNode : affNodes) {
-                    if (affNode == null)
+                    if (affNodes.isEmpty())
                         throw new ClusterTopologyServerNotFoundException("Failed to map keys
for cache " +
                             "(all partition nodes left the grid).");
 
-                    UUID nodeId = affNode.id();
-
-                    GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
-
-                    if (mapped == null) {
-                        mapped = new GridNearAtomicUpdateRequest(
-                            cctx.cacheId(),
-                            nodeId,
-                            futVer,
-                            fastMap,
-                            updVer,
-                            topVer,
-                            topLocked,
-                            syncMode,
-                            op,
-                            retval,
-                            expiryPlc,
-                            invokeArgs,
-                            filter,
-                            subjId,
-                            taskNameHash,
-                            skipStore,
-                            keepBinary,
-                            cctx.kernalContext().clientNode(),
-                            cctx.deploymentEnabled(),
-                            keys.size());
-
-                        pendingMappings.put(nodeId, mapped);
-                    }
+                    int i = 0;
+
+                    for (ClusterNode affNode : affNodes) {
+                        if (affNode == null)
+                            throw new ClusterTopologyServerNotFoundException("Failed to map
keys for cache " +
+                                "(all partition nodes left the grid).");
+
+                        UUID nodeId = affNode.id();
+
+                        GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+
+                        if (mapped == null) {
+                            mapped = new GridNearAtomicUpdateRequest(
+                                cctx.cacheId(),
+                                nodeId,
+                                futVer,
+                                fastMap,
+                                updVer,
+                                topVer,
+                                topLocked,
+                                syncMode,
+                                op,
+                                retval,
+                                expiryPlc,
+                                invokeArgs,
+                                filter,
+                                subjId,
+                                taskNameHash,
+                                skipStore,
+                                keepBinary,
+                                cctx.kernalContext().clientNode(),
+                                cctx.deploymentEnabled(),
+                                1);
+
+                            pendingMappings.put(nodeId, mapped);
+                        }
 
-                    mapped.addUpdateEntry(cacheKey,
-                        val,
-                        CU.TTL_NOT_CHANGED,
-                        CU.EXPIRE_TIME_CALCULATE,
-                        null,
-                        i == 0
-                    );
+                        mapped.addUpdateEntry(cacheKey,
+                            val,
+                            CU.TTL_NOT_CHANGED,
+                            CU.EXPIRE_TIME_CALCULATE,
+                            null,
+                            i == 0
+                        );
 
-                    i++;
+                        i++;
+                    }
                 }
             }
 
@@ -1075,16 +1039,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
         private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
             GridCacheVersion futVer,
             @Nullable GridCacheVersion updVer) throws Exception {
-            Object key = F.first(keys);
-
-            Object val;
-
-            if (vals != null)
-                // Regular PUT.
-                val = F.first(vals);
-            else
-                // Regular REMOVE.
-                val = null;
+            Object key = GridNearAtomicSingleUpdateFuture.this.key;
+            Object val = GridNearAtomicSingleUpdateFuture.this.val;
 
             // We still can get here if user pass map with single element.
             if (key == null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/f3b4fa26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 69e6274..c33df07 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -251,11 +251,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
implem
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<?> keys() {
-        return keys;
-    }
-
-    /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
         state.onNodeLeft(nodeId);
 


Mime
View raw message