ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: 'Single' operations optimizations for tx cache.
Date Mon, 16 Nov 2015 12:07:36 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-single-op-tx a03c16508 -> 6b38cd05c


'Single' operations optimizations for tx cache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b38cd05
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b38cd05
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b38cd05

Branch: refs/heads/ignite-single-op-tx
Commit: 6b38cd05c6c073d8ca1d0493bd8029f3beda2209
Parents: a03c165
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Nov 16 14:38:10 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Nov 16 15:07:21 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 46 ++++++++++----------
 .../cache/GridCacheSharedContext.java           | 11 ++++-
 .../IgniteTxImplicitSingleStateImpl.java        |  5 +++
 .../transactions/IgniteTxLocalAdapter.java      | 23 ++++++++--
 .../cache/transactions/IgniteTxLocalEx.java     | 15 ++++++-
 .../IgniteTxRemoteStateAdapter.java             |  5 +++
 .../cache/transactions/IgniteTxState.java       |  2 +
 .../cache/transactions/IgniteTxStateImpl.java   | 17 ++++++--
 8 files changed, 90 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 49ca1dc..cbb7486 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1854,7 +1854,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         V prevVal = syncOp(new SyncOp<V>(true) {
             @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException
{
-                return (V)tx.putAllAsync(ctx, F.t(key, val), true, filter).get().value();
+                return (V)tx.putAsync(ctx, key, val, true, filter).get().value();
             }
 
             @Override public String toString() {
@@ -1909,7 +1909,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         return asyncOp(new AsyncOp<V>() {
             @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), true, filter)
+                return tx.putAsync(ctx, key, val, true, filter)
                     .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>,
V>)RET2VAL);
             }
 
@@ -2013,10 +2013,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
             @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter
tx)
                 throws IgniteCheckedException {
-                Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
-                    Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
-
-                IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, invokeMap,
args);
+                IgniteInternalFuture<GridCacheReturn> fut =
+                    tx.invokeAsync(ctx, key, (EntryProcessor<K, V, Object>)entryProcessor,
args);
 
                 Map<K, EntryProcessorResult<T>> resMap = fut.get().value();
 
@@ -2240,8 +2238,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         return asyncOp(new AsyncOp<Boolean>() {
             @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter
tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), false, filter).chain(
-                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)
RET2FLAG);
+                return tx.putAsync(ctx, key, val, false, filter).chain(
+                    (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
             }
 
             @Override public String toString() {
@@ -2275,7 +2273,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         return syncOp(new SyncOp<V>(true) {
             @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException
{
-                return (V)tx.putAllAsync(ctx, F.t(key, val), true, ctx.noValArray()).get().value();
+                return (V)tx.putAsync(ctx, key, val, true, ctx.noValArray()).get().value();
             }
 
             @Override public String toString() {
@@ -2299,8 +2297,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
             @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), true, ctx.noValArray())
-                    .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>,
V>) RET2VAL);
+                return tx.putAsync(ctx, key, val, true, ctx.noValArray())
+                    .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>,
V>)RET2VAL);
             }
 
             @Override public String toString() {
@@ -2329,7 +2327,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         Boolean stored = syncOp(new SyncOp<Boolean>(true) {
             @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException
{
-                return tx.putAllAsync(ctx, F.t(key, val), false, ctx.noValArray()).get().success();
+                return tx.putAsync(ctx, key, val, false, ctx.noValArray()).get().success();
             }
 
             @Override public String toString() {
@@ -2358,7 +2356,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
             @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter
tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), false, ctx.noValArray()).chain(
+                return tx.putAsync(ctx, key, val, false, ctx.noValArray()).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
             }
 
@@ -2384,7 +2382,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         return syncOp(new SyncOp<V>(true) {
             @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException
{
-                return (V)tx.putAllAsync(ctx, F.t(key, val), true, ctx.hasValArray()).get().value();
+                return (V)tx.putAsync(ctx, key, val, true, ctx.hasValArray()).get().value();
             }
 
             @Override public String toString() {
@@ -2408,7 +2406,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
             @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), true, ctx.hasValArray()).chain(
+                return tx.putAsync(ctx, key, val, true, ctx.hasValArray()).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
             }
 
@@ -2434,7 +2432,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         return syncOp(new SyncOp<Boolean>(true) {
             @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException
{
-                return tx.putAllAsync(ctx, F.t(key, val), false, ctx.hasValArray()).get().success();
+                return tx.putAsync(ctx, key, val, false, ctx.hasValArray()).get().success();
             }
 
             @Override public String toString() {
@@ -2454,7 +2452,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
 
         return asyncOp(new AsyncOp<Boolean>() {
             @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter
tx) {
-                return tx.putAllAsync(ctx, F.t(key, val), false, ctx.hasValArray()).chain(
+                return tx.putAsync(ctx, key, val, false, ctx.hasValArray()).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)
RET2FLAG);
             }
 
@@ -2481,7 +2479,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                 if (ctx.deploymentEnabled())
                     ctx.deploy().registerClass(oldVal);
 
-                return tx.putAllAsync(ctx, F.t(key, newVal), false, ctx.equalsValArray(oldVal)).get()
+                return tx.putAsync(ctx, key, newVal, false, ctx.equalsValArray(oldVal)).get()
                     .success();
             }
 
@@ -2518,7 +2516,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                     }
                 }
 
-                return tx.putAllAsync(ctx, F.t(key, newVal), false, ctx.equalsValArray(oldVal)).chain(
+                return tx.putAsync(ctx, key, newVal, false, ctx.equalsValArray(oldVal)).chain(
                     (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
             }
 
@@ -2883,8 +2881,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                 if (ctx.deploymentEnabled())
                     ctx.deploy().registerClass(oldVal);
 
-                return tx.putAllAsync(ctx,
-                        F.t(key, newVal),
+                return tx.putAsync(ctx,
+                        key,
+                        newVal,
                         true,
                         ctx.equalsValArray(oldVal)).get();
             }
@@ -2945,8 +2944,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                     return new GridFinishedFuture<>(e);
                 }
 
-                return (IgniteInternalFuture)tx.putAllAsync(ctx,
-                    F.t(key, newVal),
+                return (IgniteInternalFuture)tx.putAsync(ctx,
+                    key,
+                    newVal,
                     true,
                     ctx.equalsValArray(oldVal));
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 5321bb3..4293b90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -591,10 +591,17 @@ public class GridCacheSharedContext<K, V> {
      * @param tx Transaction to commit.
      * @return Commit future.
      */
+    @SuppressWarnings("unchecked")
     public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(IgniteInternalTx tx)
{
-        tx.txState().awaitLastFut(this);
+        GridCacheContext ctx = tx.txState().singleCacheContext(this);
 
-        return tx.commitAsync();
+        if (ctx == null) {
+            tx.txState().awaitLastFut(this);
+
+            return tx.commitAsync();
+        }
+        else
+            return ctx.cache().commitTxAsync(tx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 1b99159..c4012e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -58,6 +58,11 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter
{
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext
cctx) {
+        return cacheCtx;
+    }
+
+    /** {@inheritDoc} */
     @Nullable @Override public Integer firstCacheId() {
         return cacheCtx != null ? cacheCtx.cacheId() : null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index d22d6f4..895bcd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1922,7 +1922,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         V val,
         boolean retval,
         CacheEntryPredicate[] filter) {
-        return putAsync0(cacheCtx, key, val, retval, filter);
+        return putAsync0(cacheCtx, key, val, null, null, retval, filter);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext
cacheCtx,
+        K key,
+        EntryProcessor<K, V, Object> entryProcessor,
+        Object... invokeArgs) {
+        return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, entryProcessor, invokeArgs,
true, null);
     }
 
     /** {@inheritDoc} */
@@ -2015,6 +2023,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
             final boolean needVal = singleRmv || retval || hasFilters;
             final boolean needReadVer = needVal && (serializable() && optimistic());
 
+            if (entryProcessor != null)
+                transform = true;
+
             boolean loadMissed = enlistWriteEntry(cacheCtx,
                 cacheKey,
                 val,
@@ -2847,6 +2858,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
      * @param cacheCtx Cache context.
      * @param key Key.
      * @param val Value.
+     * @param entryProcessor Entry processor.
+     * @param invokeArgs Optional arguments for EntryProcessor.
      * @param retval Return value flag.
      * @param filter Filter.
      * @return Operation future.
@@ -2854,7 +2867,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     private <K, V> IgniteInternalFuture putAsync0(
         final GridCacheContext cacheCtx,
         K key,
-        V val,
+        @Nullable V val,
+        @Nullable EntryProcessor entryProcessor,
+        @Nullable final Object[] invokeArgs,
         final boolean retval,
         @Nullable final CacheEntryPredicate[] filter
     ) {
@@ -2874,8 +2889,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
                 cacheKey,
                 val,
                 opCtx != null ? opCtx.expiry() : null,
-                null,
-                null,
+                entryProcessor,
+                invokeArgs,
                 retval,
                 /*lockOnly*/false,
                 filter,

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index f9555cc..5dc3338 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -108,9 +108,22 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
 
     /**
      * @param cacheCtx Cache context.
+     * @param key Key.
+     * @param entryProcessor Entry processor.
+     * @param invokeArgs Optional arguments for entry processor.
+     * @return Operation future.
+     */
+    public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(
+        GridCacheContext cacheCtx,
+        K key,
+        EntryProcessor<K, V, Object> entryProcessor,
+        Object... invokeArgs);
+
+    /**
+     * @param cacheCtx Cache context.
      * @param map Entry processors map.
      * @param invokeArgs Optional arguments for entry processor.
-     * @return Transform operation future.
+     * @return Operation future.
      */
     public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync(
         GridCacheContext cacheCtx,

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index 9be37e1..5b15296 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -99,6 +99,11 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext
cctx) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean
commit) {
         assert false;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index cb9d93d..4965c29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -44,6 +44,8 @@ public interface IgniteTxState {
      */
     @Nullable public Integer firstCacheId();
 
+    @Nullable GridCacheContext singleCacheContext(GridCacheSharedContext cctx);
+
     /**
      *
      * @param cctx

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b38cd05/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 1e12fe4..d80cef9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -69,11 +69,20 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public void awaitLastFut(GridCacheSharedContext cctx) {
-        if (activeCacheIds.size() > 1) {
-            for (Integer cacheId : activeCacheIds)
-                cctx.cacheContext(cacheId).cache().awaitLastFut();
+    @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext
cctx) {
+        if (activeCacheIds.size() == 1) {
+            int cacheId = F.first(activeCacheIds);
+
+            return cctx.cacheContext(cacheId);
         }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void awaitLastFut(GridCacheSharedContext cctx) {
+        for (Integer cacheId : activeCacheIds)
+            cctx.cacheContext(cacheId).cache().awaitLastFut();
     }
 
     /** {@inheritDoc} */


Mime
View raw message