ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [4/6] ignite git commit: Finished update.
Date Thu, 31 Mar 2016 13:27:08 GMT
Finished update.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30b2dde9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30b2dde9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30b2dde9

Branch: refs/heads/ignite-2925
Commit: 30b2dde9ab55a64b62dfc455dce694aac2c89ccd
Parents: 8955c30
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Thu Mar 31 16:12:58 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Thu Mar 31 16:12:58 2016 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          | 126 ++++++++++++-------
 1 file changed, 83 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/30b2dde9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e1ecb97..a681fcf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -82,7 +82,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
-import org.apache.ignite.internal.util.F0;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -445,14 +444,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     @Override public IgniteInternalFuture<V> getAndPutAsync0(K key, V val, @Nullable
CacheEntryPredicate... filter) {
         A.notNull(key, "key");
 
-        return updateAsync0(F0.asMap(key, val),
+        return updateAsync0(
+            key,
+            val,
             null,
             null,
             true,
-            false,
             filter,
-            true,
-            UPDATE);
+            true);
     }
 
     /** {@inheritDoc} */
@@ -460,28 +459,28 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable
CacheEntryPredicate... filter) {
         A.notNull(key, "key");
 
-        return updateAsync0(F0.asMap(key, val),
+        return updateAsync0(
+            key,
+            val,
             null,
             null,
             false,
-            false,
             filter,
-            true,
-            UPDATE);
+            true);
     }
 
     /** {@inheritDoc} */
     @Override public V tryPutIfAbsent(K key, V val) throws IgniteCheckedException {
         A.notNull(key, "key", val, "val");
 
-        return (V)updateAsync0(F0.asMap(key, val),
+        return (V)updateAsync0(
+            key,
+            val,
             null,
             null,
             true,
-            false,
             ctx.noValArray(),
-            false,
-            UPDATE).get();
+            false).get();
     }
 
     /** {@inheritDoc} */
@@ -747,21 +746,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
         if (keyCheck)
             validateCacheKey(key);
 
-        Map<? extends K, EntryProcessor> invokeMap =
-            Collections.singletonMap(key, (EntryProcessor)entryProcessor);
-
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
 
-        IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = updateAsync0(null,
-            invokeMap,
+        IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = updateAsync0(
+            key,
+            null,
+            entryProcessor,
             args,
             false,
-            false,
             null,
-            true,
-            TRANSFORM);
+            true);
 
         return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>,
EntryProcessorResult<T>>() {
             @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K,
EntryProcessorResult<T>>> fut)
@@ -858,19 +854,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             TRANSFORM);
     }
 
-    private IgniteInternalFuture updateAsync0(
-        @Nullable Map<? extends K, ? extends V> map,
-        @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap,
-        @Nullable Object[] invokeArgs,
-        final boolean retval,
-        final boolean rawRetval,
-        @Nullable final CacheEntryPredicate[] filter,
-        final boolean waitTopFut,
-        final GridCacheOperation op
-    ) {
-        return updateAllAsync0(map, invokeMap, invokeArgs, null, null, retval, rawRetval,
filter, waitTopFut, op);
-    }
-
     /**
      * Entry point for all public API put/transform methods.
      *
@@ -983,6 +966,67 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
     }
 
     /**
+     * Entry point for update/invoke with a single key.
+     *
+     * @param key Key.
+     * @param val Value.
+     * @param proc Entry processor.
+     * @param invokeArgs Invoke arguments.
+     * @param retval Return value flag.
+     * @param filter Filter.
+     * @param waitTopFut Whether to wait for topology future.
+     * @return Future.
+     */
+    private IgniteInternalFuture updateAsync0(
+        K key,
+        @Nullable V val,
+        @Nullable EntryProcessor proc,
+        @Nullable Object[] invokeArgs,
+        final boolean retval,
+        @Nullable final CacheEntryPredicate[] filter,
+        final boolean waitTopFut
+    ) {
+        assert val == null || proc == null;
+
+        assert ctx.updatesAllowed();
+
+        validateCacheKey(key);
+
+        ctx.checkSecurity(SecurityPermission.CACHE_PUT);
+
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+        final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
+            ctx,
+            this,
+            ctx.config().getWriteSynchronizationMode(),
+            val != null ? UPDATE : TRANSFORM,
+            Collections.singletonList(key),
+            val != null ? Collections.singletonList(val) : Collections.singletonList(proc),
+            invokeArgs,
+            null,
+            null,
+            retval,
+            false,
+            opCtx != null ? opCtx.expiry() : null,
+            filter,
+            ctx.subjectIdPerCall(null, opCtx),
+            ctx.kernalContext().job().currentTaskNameHash(),
+            opCtx != null && opCtx.skipStore(),
+            opCtx != null && opCtx.isKeepBinary(),
+            opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+            waitTopFut);
+
+        return asyncOp(new CO<IgniteInternalFuture<Object>>() {
+            @Override public IgniteInternalFuture<Object> apply() {
+                updateFut.map();
+
+                return updateFut;
+            }
+        });
+    }
+
+    /**
      * Entry point for remove with single key.
      *
      * @param key Key.
@@ -992,21 +1036,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      */
     private IgniteInternalFuture removeAsync0(K key, final boolean retval,
         @Nullable final CacheEntryPredicate[] filter) {
-        assert ctx.updatesAllowed();
-
         final boolean statsEnabled = ctx.config().isStatisticsEnabled();
 
         final long start = statsEnabled ? System.nanoTime() : 0L;
 
+        assert ctx.updatesAllowed();
+
         validateCacheKey(key);
 
         ctx.checkSecurity(SecurityPermission.CACHE_REMOVE);
 
-        final CacheOperationContext opCtx = ctx.operationContextPerCall();
-
-        UUID subjId = ctx.subjectIdPerCall(null, opCtx);
-
-        int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
 
         final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
             ctx,
@@ -1022,8 +1062,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
             false,
             (filter != null && opCtx != null) ? opCtx.expiry() : null,
             filter,
-            subjId,
-            taskNameHash,
+            ctx.subjectIdPerCall(null, opCtx),
+            ctx.kernalContext().job().currentTaskNameHash(),
             opCtx != null && opCtx.skipStore(),
             opCtx != null && opCtx.isKeepBinary(),
             opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,


Mime
View raw message