ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-GG-10837 WIP
Date Mon, 18 Jan 2016 17:35:05 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-gg-10837 32cc13935 -> 272f397f5


IGNITE-GG-10837 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/272f397f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/272f397f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/272f397f

Branch: refs/heads/ignite-gg-10837
Commit: 272f397f5451193d3454fdfda5b350cfd4873088
Parents: 32cc139
Author: nikolay_tikhonov <ntikhonov@gridgain.com>
Authored: Mon Jan 18 20:34:20 2016 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Mon Jan 18 20:34:20 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheOperationContext.java | 50 ++++++++++++++++
 .../processors/cache/IgniteCacheProxy.java      | 28 +++++++++
 .../dht/atomic/GridDhtAtomicCache.java          | 60 +++++++++++++++++---
 .../processors/cache/dr/GridCacheDrInfo.java    |  9 +++
 .../transactions/IgniteTxLocalAdapter.java      | 44 +++++++++++---
 .../datastreamer/DataStreamerImpl.java          |  1 +
 .../ignite/testframework/GridTestUtils.java     |  2 +-
 7 files changed, 178 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/272f397f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
index 21934d0..5f64dc5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
@@ -48,6 +48,9 @@ public class CacheOperationContext implements Serializable {
     /** Expiry policy. */
     private final ExpiryPolicy expiryPlc;
 
+    /** Data center Id. */
+    private final Byte dataCenterId;
+
     /**
      * Constructor with default values.
      */
@@ -61,6 +64,8 @@ public class CacheOperationContext implements Serializable {
         expiryPlc = null;
 
         noRetries = false;
+
+        dataCenterId = null;
     }
 
     /**
@@ -84,6 +89,35 @@ public class CacheOperationContext implements Serializable {
         this.expiryPlc = expiryPlc;
 
         this.noRetries = noRetries;
+
+        this.dataCenterId = 1;
+    }
+
+    /**
+     * @param skipStore Skip store flag.
+     * @param subjId Subject ID.
+     * @param keepBinary Keep binary flag.
+     * @param expiryPlc Expiry policy.
+     * @param dataCenterId Data center id.
+     */
+    public CacheOperationContext(
+        boolean skipStore,
+        @Nullable UUID subjId,
+        boolean keepBinary,
+        @Nullable ExpiryPolicy expiryPlc,
+        boolean noRetries,
+        @Nullable Byte dataCenterId) {
+        this.skipStore = skipStore;
+
+        this.subjId = subjId;
+
+        this.keepBinary = keepBinary;
+
+        this.expiryPlc = expiryPlc;
+
+        this.noRetries = noRetries;
+
+        this.dataCenterId = dataCenterId;
     }
 
     /**
@@ -94,6 +128,13 @@ public class CacheOperationContext implements Serializable {
     }
 
     /**
+     * @return {@code True} if data center id is set otherwise {@code false}.
+     */
+    public boolean hasDataCenterId() {
+        return dataCenterId != null;
+    }
+
+    /**
      * See {@link IgniteInternalCache#keepBinary()}.
      *
      * @return New instance of CacheOperationContext with keep binary flag.
@@ -117,6 +158,15 @@ public class CacheOperationContext implements Serializable {
     }
 
     /**
+     * Gets data center ID.
+     *
+     * @return Client ID.
+     */
+    @Nullable public Byte dataCenterId() {
+        return dataCenterId;
+    }
+
+    /**
      * See {@link IgniteInternalCache#forSubjectId(UUID)}.
      *
      * @param subjId Subject id.

http://git-wip-us.apache.org/repos/asf/ignite/blob/272f397f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 749f4ab..228f795 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1874,6 +1874,34 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
     }
 
     /**
+     * @return Projection for data center id.
+     */
+    @SuppressWarnings("unchecked")
+    public <K, V> IgniteCache<K, V> withDataCenterId(byte dataCenterId) {
+        CacheOperationContext prev = onEnter(gate, opCtx);
+
+        try {
+            CacheOperationContext opCtx0 =
+                new CacheOperationContext(
+                    opCtx != null && opCtx.skipStore(),
+                    opCtx != null ? opCtx.subjectId() : null,
+                    opCtx != null && opCtx.isKeepBinary(),
+                    opCtx != null ? opCtx.expiry() : null,
+                    opCtx != null && opCtx.noRetries(),
+                    dataCenterId);
+
+            return new IgniteCacheProxy<>((GridCacheContext<K, V>)ctx,
+                (GridCacheAdapter<K, V>)delegate,
+                opCtx0,
+                isAsync(),
+                lock);
+        }
+        finally {
+            onLeave(gate, prev);
+        }
+    }
+
+    /**
      * @return Cache with skip store enabled.
      */
     public IgniteCache<K, V> skipStore() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/272f397f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index a7aa71c..38e5c36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -37,6 +37,7 @@ import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorResult;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -77,7 +78,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
-import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
@@ -899,11 +899,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      */
     @SuppressWarnings("ConstantConditions")
     private IgniteInternalFuture updateAllAsync0(
-        @Nullable final Map<? extends K, ? extends V> map,
-        @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
+        @Nullable Map<? extends K, ? extends V> map,
+        @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap,
         @Nullable Object[] invokeArgs,
-        @Nullable final Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
-        @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
+        @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap,
+        @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap,
         final boolean retval,
         final boolean rawRetval,
         @Nullable final CacheEntryPredicate[] filter,
@@ -919,6 +919,41 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
         CacheOperationContext opCtx = ctx.operationContextPerCall();
 
+        if (opCtx != null && opCtx.hasDataCenterId()) {
+            if (op == GridCacheOperation.TRANSFORM) {
+                Map<KeyCacheObject, GridCacheDrInfo> confMap = new HashMap<>(invokeMap.size());
+
+                for (Map.Entry<? extends K, ? extends EntryProcessor> e : invokeMap.entrySet())
+                    confMap.put(ctx.toCacheKeyObject(e.getKey()), new GridCacheDrInfo((CacheEntryProcessor)e.getValue(),
+                        ctx.versions().next(opCtx.dataCenterId())));
+
+                conflictPutMap = confMap;
+
+                invokeMap = null;
+            }
+            else if (op == GridCacheOperation.DELETE) {
+                Map<KeyCacheObject, GridCacheVersion> confMap = new HashMap<>(map.size());
+
+                for (K key : map.keySet())
+                    confMap.put(ctx.toCacheKeyObject(key), ctx.versions().next(opCtx.dataCenterId()));
+
+                conflictRmvMap = confMap;
+
+                map = null;
+            }
+            else {
+                Map<KeyCacheObject, GridCacheDrInfo> confMap = new HashMap<>(map.size());
+
+                for (Map.Entry<? extends K, ? extends V> e : map.entrySet())
+                    confMap.put(ctx.toCacheKeyObject(e.getKey()), new GridCacheDrInfo(
+                        ctx.toCacheObject(e.getValue()), ctx.versions().next(opCtx.dataCenterId())));
+
+                conflictPutMap = confMap;
+
+                map = null;
+            }
+        }
+
         UUID subjId = ctx.subjectIdPerCall(null, opCtx);
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
@@ -965,8 +1000,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
      * @return Completion future.
      */
     private IgniteInternalFuture removeAllAsync0(
-        @Nullable final Collection<? extends K> keys,
-        @Nullable final Map<KeyCacheObject, GridCacheVersion> conflictMap,
+        @Nullable Collection<? extends K> keys,
+        @Nullable Map<KeyCacheObject, GridCacheVersion> conflictMap,
         final boolean retval,
         boolean rawRetval,
         @Nullable final CacheEntryPredicate[] filter
@@ -990,6 +1025,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
         int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
 
+        if (opCtx != null && keys != null && opCtx.hasDataCenterId()) {
+            Map<KeyCacheObject, GridCacheVersion> confMap = new HashMap<>(keys.size());
+
+            for (K key : keys)
+                confMap.put(ctx.toCacheKeyObject(key), ctx.versions().next(opCtx.dataCenterId()));
+
+            conflictMap = confMap;
+
+            keys = null;
+        }
+
         final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
             ctx,
             this,

http://git-wip-us.apache.org/repos/asf/ignite/blob/272f397f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
index 8dfe313..bd3df02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrInfo.java
@@ -67,6 +67,15 @@ public class GridCacheDrInfo implements Externalizable {
     /**
      * Constructor.
      *
+     * @param ver Version.
+     */
+    public GridCacheDrInfo(GridCacheVersion ver) {
+        this.ver = ver;
+    }
+
+    /**
+     * Constructor.
+     *
      * @param proc Entry processor.
      * @param ver Version.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/272f397f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 5bf10bf..3194c20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -2076,6 +2077,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
         @Nullable ExpiryPolicy expiryPlc,
         @Nullable EntryProcessor<K, V, Object> entryProcessor,
         @Nullable Object[] invokeArgs,
+        @Nullable GridCacheVersion drVer,
         final boolean retval,
         boolean lockOnly,
         final CacheEntryPredicate[] filter,
@@ -2093,6 +2095,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
             if (entryProcessor != null)
                 transform = true;
 
+            long drTtl = singleRmv ? -1L : CU.TTL_ETERNAL;
+            long drExpireTime = singleRmv ? -1L : CU.EXPIRE_TIME_ETERNAL;
+
             boolean loadMissed = enlistWriteEntry(cacheCtx,
                 cacheKey,
                 val,
@@ -2102,9 +2107,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
                 retval,
                 lockOnly,
                 filter,
-                /*drVer*/null,
-                /*drTtl*/-1L,
-                /*drExpireTime*/-1L,
+                /*drVer*/drVer,
+                drTtl,
+                drExpireTime,
                 ret,
                 /*enlisted*/null,
                 skipStore,
@@ -2965,6 +2970,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
implements Ig
 
             CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
 
+            GridCacheVersion drVer = null;
+
+            if (opCtx != null && opCtx.hasDataCenterId())
+                drVer = cacheCtx.versions().next(opCtx.dataCenterId());
+
             KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key);
 
             boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
@@ -2976,6 +2986,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
                 opCtx != null ? opCtx.expiry() : null,
                 entryProcessor,
                 invokeArgs,
+                drVer,
                 retval,
                 /*lockOnly*/false,
                 filter,
@@ -3080,7 +3091,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
         @Nullable Map<? extends K, ? extends V> map,
         @Nullable Map<? extends K, ? extends EntryProcessor<K, V, Object>> invokeMap,
         @Nullable final Object[] invokeArgs,
-        @Nullable final Map<KeyCacheObject, GridCacheDrInfo> drMap,
+        @Nullable Map<KeyCacheObject, GridCacheDrInfo> drMap,
         final boolean retval,
         @Nullable final CacheEntryPredicate[] filter
     ) {
@@ -3093,6 +3104,16 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
implements Ig
             return new GridFinishedFuture(e);
         }
 
+        final CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+        if (opCtx != null && opCtx.hasDataCenterId())
+            drMap = (Map<KeyCacheObject, GridCacheDrInfo>)F.viewReadOnly((Map<K,
V>)map,
+                new IgniteClosure<V, GridCacheDrInfo>() {
+                    @Override public GridCacheDrInfo apply(V val) {
+                        return new GridCacheDrInfo(cctx.versions().next(opCtx.dataCenterId()));
+                    }
+                });
+
         // Cached entry may be passed only from entry wrapper.
         final Map<?, ?> map0 = map;
         final Map<?, EntryProcessor<K, V, Object>> invokeMap0 = (Map<K, EntryProcessor<K,
V, Object>>)invokeMap;
@@ -3121,8 +3142,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
 
             final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
 
-            CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
             final boolean keepBinary = opCtx != null && opCtx.isKeepBinary();
 
             final IgniteInternalFuture<Void> loadFut = enlistWrite(
@@ -3345,6 +3364,17 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
implements Ig
         else
             keys0 = keys;
 
+        CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
+
+        if (drMap == null && opCtx != null && opCtx.hasDataCenterId()) {
+            Map<K, GridCacheVersion> confMap = new TreeMap<>();
+
+            for (K key : keys)
+                confMap.put(key, cacheCtx.versions().next(opCtx.dataCenterId()));
+
+            drMap = (Map)confMap;
+        }
+
         assert keys0 != null;
 
         if (log.isDebugEnabled()) {
@@ -3378,8 +3408,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements
Ig
 
         final Collection<KeyCacheObject> enlisted = new ArrayList<>();
 
-        CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
-
         ExpiryPolicy plc;
 
         if (!F.isEmpty(filter))

http://git-wip-us.apache.org/repos/asf/ignite/blob/272f397f/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 27eff0c..801d6ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;

http://git-wip-us.apache.org/repos/asf/ignite/blob/272f397f/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 7116227..55ed357 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -934,7 +934,7 @@ public final class GridTestUtils {
      * @return Resolved path, or {@code null} if file cannot be resolved.
      */
     @Nullable private static File resolvePath(@Nullable String igniteHome, String path) {
-        File file = new File(path).getAbsoluteFile();
+        File file = new File("D:\\projects\\ggprivate\\modules\\core\\src\\test\\config\\tests.properties").getAbsoluteFile();
 
         if (!file.exists()) {
             String home = igniteHome != null ? igniteHome : U.getIgniteHome();


Mime
View raw message