ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [2/3] ignite git commit: 'Single get' operation optimization.
Date Wed, 18 Nov 2015 09:13:37 GMT
'Single get' operation optimization.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5ffcb554
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5ffcb554
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5ffcb554

Branch: refs/heads/ignite-single-op-get
Commit: 5ffcb55462f8c928c41eec40a85e58f20940547b
Parents: 81d060d
Author: sboikov <sboikov@gridgain.com>
Authored: Wed Nov 18 11:47:44 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Wed Nov 18 12:06:36 2015 +0300

----------------------------------------------------------------------
 .../dht/GridPartitionedSingleGetFuture.java     |   2 +-
 .../dht/colocated/GridDhtColocatedCache.java    |  55 +++++++-
 .../distributed/near/GridNearGetFuture.java     |  13 +-
 .../cache/distributed/near/GridNearTxLocal.java | 141 +++++++++++++------
 .../transactions/IgniteTxLocalAdapter.java      |   2 +-
 5 files changed, 161 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5ffcb554/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 7e15bbf..b0e9714 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -427,7 +427,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object>
im
                 if (skipVals)
                     setSkipValueResult(true, verVal.version());
                 else
-                    setResult(null , null);
+                    setResult(verVal.value() , verVal.version());
             }
             else {
                 if (skipVals)

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ffcb554/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index a8b36e2..662bcdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -355,6 +355,54 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
     }
 
     /**
+     * @param key Key to load.
+     * @param readThrough Read through flag.
+     * @param forcePrimary Force get from primary node flag.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param deserializePortable Deserialize portable flag.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology
version.
+     * @param needVer If {@code true} returns values as tuples containing value and version.
+     * @param keepCacheObj Keep cache objects flag.
+     * @return Load future.
+     */
+    public final IgniteInternalFuture<Object> loadAsync(
+        KeyCacheObject key,
+        boolean readThrough,
+        boolean forcePrimary,
+        AffinityTopologyVersion topVer,
+        @Nullable UUID subjId,
+        String taskName,
+        boolean deserializePortable,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc,
+        boolean skipVals,
+        boolean canRemap,
+        boolean needVer,
+        boolean keepCacheObj
+    ) {
+        GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx,
+            ctx.toCacheKeyObject(key),
+            topVer,
+            readThrough,
+            forcePrimary,
+            subjId,
+            taskName,
+            deserializePortable,
+            expiryPlc,
+            skipVals,
+            canRemap,
+            needVer,
+            keepCacheObj);
+
+        fut.init();
+
+        return fut;
+    }
+
+    /**
      * @param keys Keys to load.
      * @param readThrough Read through flag.
      * @param forcePrimary Force get from primary node flag.
@@ -364,9 +412,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
      * @param deserializePortable Deserialize portable flag.
      * @param expiryPlc Expiry policy.
      * @param skipVals Skip values flag.
-     * @return Loaded values.
+     * @param canRemap Flag indicating whether future can be remapped on a newer topology
version.
+     * @param needVer If {@code true} returns values as tuples containing value and version.
+     * @param keepCacheObj Keep cache objects flag.
+     * @return Load future.
      */
-    public IgniteInternalFuture<Map<K, V>> loadAsync(
+    public final IgniteInternalFuture<Map<K, V>> loadAsync(
         @Nullable Collection<KeyCacheObject> keys,
         boolean readThrough,
         boolean forcePrimary,

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ffcb554/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index 8f47fb7..4d1c974 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -386,6 +386,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
      * @param saved Reserved near cache entries.
      * @return Map.
      */
+    @SuppressWarnings("unchecked")
     private Map<KeyCacheObject, GridNearCacheEntry> map(
         KeyCacheObject key,
         Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings,
@@ -538,11 +539,17 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
                         }
                         else {
                             K key0 = key.value(cctx.cacheObjectContext(), true);
-                            V val0 = v.value(cctx.cacheObjectContext(), true);
-
-                            val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
                             key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable);
 
+                            V val0;
+
+                            if (!skipVals) {
+                                val0 = v.value(cctx.cacheObjectContext(), true);
+                                val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable);
+                            }
+                            else
+                                val0 = (V)Boolean.TRUE;
+
                             add(new GridFinishedFuture<>(Collections.singletonMap(key0,
val0)));
                         }
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ffcb554/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 434b6c7..115e00a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -332,7 +332,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         boolean readThrough,
         boolean async,
         final Collection<KeyCacheObject> keys,
-        boolean skipVals,
+        final boolean skipVals,
         final boolean needVer,
         final GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c
     ) {
@@ -361,35 +361,70 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
             });
         }
         else if (cacheCtx.isColocated()) {
-            return cacheCtx.colocated().loadAsync(
-                keys,
-                readThrough,
-                /*force primary*/needVer,
-                topologyVersion(),
-                CU.subjectId(this, cctx),
-                resolveTaskName(),
-                /*deserializePortable*/false,
-                accessPolicy(cacheCtx, keys),
-                skipVals,
-                /*can remap*/true,
-                needVer,
-                /*keepCacheObject*/true
-            ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>()
{
-                @Override public Void apply(IgniteInternalFuture<Map<Object, Object>>
f) {
-                    try {
-                        Map<Object, Object> map = f.get();
-
-                        processLoaded(map, keys, needVer, c);
-
-                        return null;
+            if (keys.size() == 1) {
+                final KeyCacheObject key = F.first(keys);
+
+                return cacheCtx.colocated().loadAsync(
+                    key,
+                    readThrough,
+                    /*force primary*/needVer,
+                    topologyVersion(),
+                    CU.subjectId(this, cctx),
+                    resolveTaskName(),
+                    /*deserializePortable*/false,
+                    accessPolicy(cacheCtx, keys),
+                    skipVals,
+                    /*can remap*/true,
+                    needVer,
+                    /*keepCacheObject*/true
+                ).chain(new C1<IgniteInternalFuture<Object>, Void>() {
+                    @Override public Void apply(IgniteInternalFuture<Object> f) {
+                        try {
+                            Object val = f.get();
+
+                            processLoaded(key, val, needVer, skipVals, c);
+
+                            return null;
+                        }
+                        catch (Exception e) {
+                            setRollbackOnly();
+
+                            throw new GridClosureException(e);
+                        }
                     }
-                    catch (Exception e) {
-                        setRollbackOnly();
-
-                        throw new GridClosureException(e);
+                });
+            }
+            else {
+                return cacheCtx.colocated().loadAsync(
+                    keys,
+                    readThrough,
+                    /*force primary*/needVer,
+                    topologyVersion(),
+                    CU.subjectId(this, cctx),
+                    resolveTaskName(),
+                    /*deserializePortable*/false,
+                    accessPolicy(cacheCtx, keys),
+                    skipVals,
+                    /*can remap*/true,
+                    needVer,
+                    /*keepCacheObject*/true
+                ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>,
Void>() {
+                    @Override public Void apply(IgniteInternalFuture<Map<Object, Object>>
f) {
+                        try {
+                            Map<Object, Object> map = f.get();
+
+                            processLoaded(map, keys, needVer, c);
+
+                            return null;
+                        }
+                        catch (Exception e) {
+                            setRollbackOnly();
+
+                            throw new GridClosureException(e);
+                        }
                     }
-                }
-            });
+                });
+            }
         }
         else {
             assert cacheCtx.isLocal();
@@ -409,29 +444,45 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
         final Collection<KeyCacheObject> keys,
         boolean needVer,
         GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) {
-        for (KeyCacheObject key : keys) {
-            Object val = map.get(key);
+        for (KeyCacheObject key : keys)
+            processLoaded(key, map.get(key), needVer, false, c);
+    }
 
-            if (val != null) {
-                Object v;
-                GridCacheVersion ver;
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param needVer If {@code true} version is required for loaded values.
+     * @param skipVals Skip values flag.
+     * @param c Closure.
+     */
+    private void processLoaded(
+        KeyCacheObject key,
+        @Nullable Object val,
+        boolean needVer,
+        boolean skipVals,
+        GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c) {
+        if (val != null) {
+            Object v;
+            GridCacheVersion ver;
 
-                if (needVer) {
-                    T2<Object, GridCacheVersion> t = (T2)val;
+            if (needVer) {
+                T2<Object, GridCacheVersion> t = (T2)val;
 
-                    v = t.get1();
-                    ver = t.get2();
-                }
-                else {
-                    v = val;
-                    ver = null;
-                }
-
-                c.apply(key, v, ver);
+                v = t.get1();
+                ver = t.get2();
             }
-            else
+            else {
+                v = val;
+                ver = null;
+            }
+
+            if (skipVals && v == Boolean.FALSE)
                 c.apply(key, null, IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER);
+            else
+                c.apply(key, v, ver);
         }
+        else
+            c.apply(key, null, IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5ffcb554/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 758f82c..68c12a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1365,7 +1365,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             if (txEntry != null) {
                 CacheObject val = txEntry.value();
 
-                // Read value from locked entry in group-lock transaction as well.
                 if (txEntry.hasValue()) {
                     if (!F.isEmpty(txEntry.entryProcessors()))
                         val = txEntry.applyEntryProcessors(val);
@@ -2217,6 +2216,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     /**
      * @param cacheCtx Cache context.
      * @param keys Keys to load.
+     * @param filter Filter.
      * @param ret Return value.
      * @param needReadVer Read version flag.
      * @param singleRmv {@code True} for single remove operation.


Mime
View raw message