ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [1/2] ignite git commit: ignite-3478
Date Thu, 31 Aug 2017 14:48:02 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 855c2d457 -> 08be7310a


http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
new file mode 100644
index 0000000..40354a8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAssignmentHistory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+/**
+ *
+ */
+class CoordinatorAssignmentHistory {
+    /** */
+    private volatile Map<AffinityTopologyVersion, ClusterNode> assignHist = Collections.emptyMap();
+
+    /** */
+    private volatile IgniteBiTuple<AffinityTopologyVersion, ClusterNode>
+        cur = new IgniteBiTuple<>(AffinityTopologyVersion.NONE, null);
+
+    void addAssignment(AffinityTopologyVersion topVer, ClusterNode crd) {
+        assert !assignHist.containsKey(topVer);
+        assert topVer.compareTo(cur.get1()) > 0;
+
+        cur = new IgniteBiTuple<>(topVer, crd);
+
+        Map<AffinityTopologyVersion, ClusterNode> hist = new HashMap<>(assignHist);
+
+        hist.put(topVer, crd);
+
+        assignHist = hist;
+
+    }
+
+    ClusterNode currentCoordinator() {
+        return cur.get2();
+    }
+
+    ClusterNode coordinator(AffinityTopologyVersion topVer) {
+        assert topVer.initialized() : topVer;
+
+        IgniteBiTuple<AffinityTopologyVersion, ClusterNode> cur0 = cur;
+
+        if (cur0.get1().equals(topVer))
+            return cur0.get2();
+
+        Map<AffinityTopologyVersion, ClusterNode> assignHist0 = assignHist;
+
+        assert assignHist.containsKey(topVer) :
+            "No coordinator assignment [topVer=" + topVer + ", curVer=" + cur0.get1() + ",
hist=" + assignHist0.keySet() + ']';
+
+        return assignHist0.get(topVer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 7598003..dfe0e06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -634,4 +634,9 @@ public interface IgniteInternalTx {
      * @param e Commit error.
      */
     public void commitError(Throwable e);
+
+    /**
+     * @param mvccCrdCntr Update counter assigned by MVCC coordinator.
+     */
+    public void mvccCoordinatorCounter(long mvccCrdCntr);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 4d85db5..8ad717a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheLazyPlainVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -245,6 +246,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
implement
     /** Store used flag. */
     protected boolean storeEnabled = true;
 
+    /** */
+    private long mvccCrdCntr = TxMvccVersion.COUNTER_NA;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -1525,6 +1529,33 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
implement
         return (taskName = cctx.kernalContext().task().resolveTaskName(taskNameHash));
     }
 
+    /** {@inheritDoc} */
+    public final void mvccCoordinatorCounter(long mvccCrdCntr) {
+        this.mvccCrdCntr = mvccCrdCntr;
+    }
+
+    /**
+     * @return Coordinator counter.
+     */
+    public final long mvccCoordinatorCounter() {
+        return mvccCrdCntr;
+    }
+
+    /**
+     * @return Mvcc version.
+     */
+    protected final TxMvccVersion createMvccVersion() {
+        assert !txState().mvccEnabled(cctx) || mvccCrdCntr != TxMvccVersion.COUNTER_NA :
mvccCrdCntr;
+
+        if (mvccCrdCntr != TxMvccVersion.COUNTER_NA) {
+            return new TxMvccVersion(topologyVersion().topologyVersion(),
+                mvccCrdCntr,
+                nearXidVersion());
+        }
+
+        return null;
+    }
+
     /**
      * Resolve DR conflict.
      *
@@ -1824,6 +1855,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
implement
         }
 
         /** {@inheritDoc} */
+        @Override public void mvccCoordinatorCounter(long mvccCrdCntr) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean localResult() {
             return false;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index beeb184..cac1069 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -254,11 +254,7 @@ public class IgniteTxHandler {
     ) {
         req.txState(locTx.txState());
 
-        IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(
-            req.reads(),
-            req.writes(),
-            req.transactionNodes(),
-            req.last());
+        IgniteInternalFuture<GridNearTxPrepareResponse> fut = locTx.prepareAsyncLocal(req);
 
         if (locTx.isRollbackOnly())
             locTx.rollbackNearTxLocalAsync();
@@ -520,14 +516,7 @@ public class IgniteTxHandler {
             if (req.needReturnValue())
                 tx.needReturnValue(true);
 
-            IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(
-                req.reads(),
-                req.writes(),
-                req.dhtVersions(),
-                req.messageId(),
-                req.miniId(),
-                req.transactionNodes(),
-                req.last());
+            IgniteInternalFuture<GridNearTxPrepareResponse> fut = tx.prepareAsync(req);
 
             if (tx.isRollbackOnly() && !tx.commitOnPrepare()) {
                 if (tx.state() != TransactionState.ROLLED_BACK && tx.state() != TransactionState.ROLLING_BACK)
@@ -1322,6 +1311,7 @@ public class IgniteTxHandler {
                 tx.commitVersion(req.commitVersion());
                 tx.invalidate(req.isInvalidate());
                 tx.systemInvalidate(req.isSystemInvalidate());
+                tx.mvccCoordinatorCounter(req.mvccCoordinatorCounter());
 
                 // Complete remote candidates.
                 tx.doneRemote(req.baseVersion(), null, null, null);
@@ -1368,6 +1358,7 @@ public class IgniteTxHandler {
         try {
             tx.commitVersion(req.writeVersion());
             tx.invalidate(req.isInvalidate());
+            tx.mvccCoordinatorCounter(req.mvccCoordinatorCounter());
 
             // Complete remote candidates.
             tx.doneRemote(req.version(), null, null, null);

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 10b06d8..5efe225 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -289,6 +289,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter
{
     }
 
     /** {@inheritDoc} */
+    @Override public boolean mvccEnabled(GridCacheSharedContext cctx) {
+        GridCacheContext ctx0 = cacheCtx;
+
+        return ctx0 != null && ctx0.mvccEnabled();
+    }
+
+    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxImplicitSingleStateImpl.class, this);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index e7ebaae..836eecc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateTxResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -507,6 +508,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
             try {
                 cctx.tm().txContext(this);
 
+                TxMvccVersion mvccVer = createMvccVersion();
+
                 AffinityTopologyVersion topVer = topologyVersion();
 
                 /*
@@ -684,7 +687,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
                                             CU.subjectId(this, cctx),
                                             resolveTaskName(),
                                             dhtVer,
-                                            null);
+                                            null,
+                                            mvccVer);
 
                                         if (updRes.success())
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -711,7 +715,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
                                                 CU.subjectId(this, cctx),
                                                 resolveTaskName(),
                                                 dhtVer,
-                                                null);
+                                                null,
+                                                mvccVer);
                                         }
                                     }
                                     else if (op == DELETE) {
@@ -732,7 +737,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
                                             CU.subjectId(this, cctx),
                                             resolveTaskName(),
                                             dhtVer,
-                                            null);
+                                            null,
+                                            mvccVer);
 
                                         if (updRes.success())
                                             txEntry.updateCounter(updRes.updatePartitionCounter());
@@ -755,7 +761,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
                                                 CU.subjectId(this, cctx),
                                                 resolveTaskName(),
                                                 dhtVer,
-                                                null);
+                                                null,
+                                                mvccVer);
                                         }
                                     }
                                     else if (op == RELOAD) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
index b61a99c..2fe63fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
@@ -142,4 +142,9 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter
{
 
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean mvccEnabled(GridCacheSharedContext cctx) {
+        return entry != null ? entry.context().mvccEnabled() : false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
index 1326491..1b6c656 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
@@ -209,4 +209,14 @@ public class IgniteTxRemoteStateImpl extends IgniteTxRemoteStateAdapter
{
 
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean mvccEnabled(GridCacheSharedContext cctx) {
+        for (IgniteTxEntry e : writeMap.values()) {
+            if (e.context().mvccEnabled())
+                return true;
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 1fe0d2a..29cd728 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -180,4 +180,10 @@ public interface IgniteTxState {
      * @return {@code True} if transaction is empty.
      */
     public boolean empty();
+
+    /**
+     * @param cctx Context.
+     * @return {@code True} if MVCC mode is enabled for transaction.
+     */
+    public boolean mvccEnabled(GridCacheSharedContext cctx);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 4f14b5c..ea0cde4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -462,6 +462,15 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean mvccEnabled(GridCacheSharedContext cctx) {
+        assert !activeCacheIds.isEmpty();
+
+        int cacheId = activeCacheIds.get(0);
+
+        return cctx.cacheContext(cacheId).mvccEnabled();
+    }
+
+    /** {@inheritDoc} */
     public String toString() {
         return S.toString(IgniteTxStateImpl.class, this, "txMap", allEntriesCopy());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 6712b5b..f0e19c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.eviction.EvictableEntry;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -456,7 +457,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements
Gr
         UUID subjId,
         String taskName,
         @Nullable GridCacheVersion dhtVer,
-        @Nullable Long updateCntr)
+        @Nullable Long updateCntr,
+        @Nullable TxMvccVersion mvccVer
+    )
         throws IgniteCheckedException, GridCacheEntryRemovedException {
         return new GridCacheUpdateTxResult(true, rawPut(val, ttl));
     }
@@ -536,7 +539,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements
Gr
         UUID subjId,
         String taskName,
         @Nullable GridCacheVersion dhtVer,
-        @Nullable Long updateCntr
+        @Nullable Long updateCntr,
+        @Nullable TxMvccVersion mvccVer
         ) throws IgniteCheckedException, GridCacheEntryRemovedException {
         obsoleteVer = ver;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/08be7310/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index ee6cfd0..7920e0a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -204,7 +204,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
         CacheMode cacheMode,
         CacheWriteSynchronizationMode syncMode,
         int backups) {
-        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
 
         ccfg.setCacheMode(cacheMode);
         ccfg.setAtomicityMode(TRANSACTIONAL);


Mime
View raw message