ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/24] incubator-ignite git commit: # IGNITE-138: Fixed last places where cache requests were always passed through system pool.
Date Thu, 12 Feb 2015 12:57:26 GMT
# IGNITE-138: Fixed last places where cache requests were always passed through system pool.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/74b3f284
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/74b3f284
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/74b3f284

Branch: refs/heads/ignite-50
Commit: 74b3f284b0c8971abd8a5eccc2db71654425282c
Parents: a2bdfad
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Wed Feb 11 11:01:56 2015 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed Feb 11 11:01:56 2015 +0300

----------------------------------------------------------------------
 ...ridCacheOptimisticCheckPreparedTxFuture.java |  4 +--
 ...idCacheOptimisticCheckPreparedTxRequest.java | 30 +++++++++++++++++++-
 ...CachePessimisticCheckCommittedTxRequest.java | 27 ++++++++++++++++++
 ...achePessimisticCheckCommittedTxResponse.java | 29 ++++++++++++++++++-
 .../cache/transactions/IgniteTxHandler.java     | 11 +++----
 5 files changed, 92 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74b3f284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
index 7047780..75b2683 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxFuture.java
@@ -175,8 +175,8 @@ public class GridCacheOptimisticCheckPreparedTxFuture<K, V> extends
GridCompound
 
                 add(fut);
 
-                GridCacheOptimisticCheckPreparedTxRequest<K, V> req = new GridCacheOptimisticCheckPreparedTxRequest<>(tx,
-                        nodeTransactions(nodeId), futureId(), fut.futureId());
+                GridCacheOptimisticCheckPreparedTxRequest<K, V> req = new GridCacheOptimisticCheckPreparedTxRequest<>(
+                    tx, nodeTransactions(nodeId), futureId(), fut.futureId());
 
                 try {
                     cctx.io().send(nodeId, req, tx.ioPolicy());

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74b3f284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
index f20f273..611eeda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheOptimisticCheckPreparedTxRequest.java
@@ -46,6 +46,9 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends
GridDistrib
     /** Expected number of transactions on node. */
     private int txNum;
 
+    /** System transaction flag. */
+    private boolean sys;
+
     /**
      * Empty constructor required by {@link Externalizable}
      */
@@ -59,10 +62,13 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends
GridDistrib
      * @param futId Future ID.
      * @param miniId Mini future ID.
      */
-    public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx<K, V> tx, int
txNum, IgniteUuid futId, IgniteUuid miniId) {
+    public GridCacheOptimisticCheckPreparedTxRequest(IgniteInternalTx<K, V> tx, int
txNum, IgniteUuid futId,
+        IgniteUuid miniId) {
         super(tx.xidVersion(), 0);
 
         nearXidVer = tx.nearXidVersion();
+        sys = tx.system();
+
         this.futId = futId;
         this.miniId = miniId;
         this.txNum = txNum;
@@ -96,6 +102,13 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends
GridDistrib
         return txNum;
     }
 
+    /**
+     * @return System transaction flag.
+     */
+    public boolean system() {
+        return sys;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
     @Override public MessageAdapter clone() {
@@ -116,6 +129,7 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends
GridDistrib
         _clone.miniId = miniId;
         _clone.nearXidVer = nearXidVer != null ? (GridCacheVersion)nearXidVer.clone() : null;
         _clone.txNum = txNum;
+        _clone.sys = sys;
     }
 
     /** {@inheritDoc} */
@@ -153,6 +167,12 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends
GridDistrib
                 state++;
 
             case 11:
+                if (!writer.writeBoolean("sys", sys))
+                    return false;
+
+                state++;
+
+            case 12:
                 if (!writer.writeInt("txNum", txNum))
                     return false;
 
@@ -197,6 +217,14 @@ public class GridCacheOptimisticCheckPreparedTxRequest<K, V> extends
GridDistrib
                 state++;
 
             case 11:
+                sys = reader.readBoolean("sys");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
+            case 12:
                 txNum = reader.readInt("txNum");
 
                 if (!reader.isLastRead())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74b3f284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
index b4a6833..21bbf17 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxRequest.java
@@ -53,6 +53,9 @@ public class GridCachePessimisticCheckCommittedTxRequest<K, V> extends
GridDistr
     /** Flag indicating that this is near-only check. */
     private boolean nearOnlyCheck;
 
+    /** System transaction flag. */
+    private boolean sys;
+
     /**
      * Empty constructor required by {@link Externalizable}
      */
@@ -74,6 +77,8 @@ public class GridCachePessimisticCheckCommittedTxRequest<K, V> extends
GridDistr
 
         nearXidVer = tx.nearXidVersion();
         originatingNodeId = tx.eventNodeId();
+        sys = tx.system();
+
         this.originatingThreadId = originatingThreadId;
     }
 
@@ -127,6 +132,13 @@ public class GridCachePessimisticCheckCommittedTxRequest<K, V>
extends GridDistr
         return nearOnlyCheck;
     }
 
+    /**
+     * @return System transaction flag.
+     */
+    public boolean system() {
+        return sys;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
     @Override public MessageAdapter clone() {
@@ -149,6 +161,7 @@ public class GridCachePessimisticCheckCommittedTxRequest<K, V> extends
GridDistr
         _clone.originatingNodeId = originatingNodeId;
         _clone.originatingThreadId = originatingThreadId;
         _clone.nearOnlyCheck = nearOnlyCheck;
+        _clone.sys = sys;
     }
 
     /** {@inheritDoc} */
@@ -203,6 +216,12 @@ public class GridCachePessimisticCheckCommittedTxRequest<K, V>
extends GridDistr
 
                 state++;
 
+            case 14:
+                if (!writer.writeBoolean("sys", sys))
+                    return false;
+
+                state++;
+
         }
 
         return true;
@@ -265,6 +284,14 @@ public class GridCachePessimisticCheckCommittedTxRequest<K, V>
extends GridDistr
 
                 state++;
 
+            case 14:
+                sys = reader.readBoolean("sys");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74b3f284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
index 36983be..922555e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCachePessimisticCheckCommittedTxResponse.java
@@ -49,6 +49,9 @@ public class GridCachePessimisticCheckCommittedTxResponse<K, V> extends
GridDist
     /** Serialized transaction info. */
     private byte[] committedTxInfoBytes;
 
+    /** System transaction flag. */
+    private boolean sys;
+
     /**
      * Empty constructor required by {@link Externalizable}
      */
@@ -61,14 +64,16 @@ public class GridCachePessimisticCheckCommittedTxResponse<K, V>
extends GridDist
      * @param futId Future ID.
      * @param miniId Mini future ID.
      * @param committedTxInfo Committed transaction info.
+     * @param sys System transaction flag.
      */
     public GridCachePessimisticCheckCommittedTxResponse(GridCacheVersion txId, IgniteUuid
futId, IgniteUuid miniId,
-        @Nullable GridCacheCommittedTxInfo<K, V> committedTxInfo) {
+        @Nullable GridCacheCommittedTxInfo<K, V> committedTxInfo, boolean sys) {
         super(txId, 0);
 
         this.futId = futId;
         this.miniId = miniId;
         this.committedTxInfo = committedTxInfo;
+        this.sys = sys;
     }
 
     /**
@@ -92,6 +97,13 @@ public class GridCachePessimisticCheckCommittedTxResponse<K, V> extends
GridDist
         return committedTxInfo;
     }
 
+    /**
+     * @return System transaction flag.
+     */
+    public boolean system() {
+        return sys;
+    }
+
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException
{
@@ -135,6 +147,7 @@ public class GridCachePessimisticCheckCommittedTxResponse<K, V>
extends GridDist
         _clone.miniId = miniId;
         _clone.committedTxInfo = committedTxInfo;
         _clone.committedTxInfoBytes = committedTxInfoBytes;
+        _clone.sys = sys;
     }
 
     /** {@inheritDoc} */
@@ -171,6 +184,12 @@ public class GridCachePessimisticCheckCommittedTxResponse<K, V>
extends GridDist
 
                 state++;
 
+            case 11:
+                if (!writer.writeBoolean("sys", sys))
+                    return false;
+
+                state++;
+
         }
 
         return true;
@@ -209,6 +228,14 @@ public class GridCachePessimisticCheckCommittedTxResponse<K, V>
extends GridDist
 
                 state++;
 
+            case 11:
+                sys = reader.readBoolean("sys");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                state++;
+
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/74b3f284/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index bcf2018..7eb966d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -1407,7 +1407,7 @@ public class IgniteTxHandler<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Sending check prepared transaction response [nodeId=" + nodeId
+ ", res=" + res + ']');
 
-            ctx.io().send(nodeId, res, SYSTEM_POOL);
+            ctx.io().send(nodeId, res, req.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
         }
         catch (ClusterTopologyCheckedException ignored) {
             if (log.isDebugEnabled())
@@ -1463,13 +1463,14 @@ public class IgniteTxHandler<K, V> {
                 }
 
                 GridCachePessimisticCheckCommittedTxResponse<K, V>
-                    res = new GridCachePessimisticCheckCommittedTxResponse<>(
-                    req.version(), req.futureId(), req.miniId(), info);
+                    res = new GridCachePessimisticCheckCommittedTxResponse<>(req.version(),
req.futureId(),
+                    req.miniId(), info, req.system());
 
                 if (log.isDebugEnabled())
                     log.debug("Finished waiting for tx committed info [req=" + req + ", res="
+ res + ']');
 
-                sendCheckCommittedResponse(nodeId, res);            }
+                sendCheckCommittedResponse(nodeId, res);
+            }
         });
     }
 
@@ -1507,7 +1508,7 @@ public class IgniteTxHandler<K, V> {
             if (log.isDebugEnabled())
                 log.debug("Sending check committed transaction response [nodeId=" + nodeId
+ ", res=" + res + ']');
 
-            ctx.io().send(nodeId, res, SYSTEM_POOL);
+            ctx.io().send(nodeId, res, res.system() ? UTILITY_CACHE_POOL : SYSTEM_POOL);
         }
         catch (ClusterTopologyCheckedException ignored) {
             if (log.isDebugEnabled())


Mime
View raw message