ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [15/49] incubator-ignite git commit: Merge branch 'sprint-3' into ignite-443
Date Tue, 31 Mar 2015 12:48:46 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 52c71c1,de180ee..9dfb356
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@@ -184,8 -180,11 +183,11 @@@ public class GridCacheContext<K, V> imp
      /** Cache weak query iterator holder. */
      private CacheWeakQueryIteratorsHolder<Map.Entry<K, V>> itHolder;
  
+     /** Affinity node. */
+     private boolean affNode;
+ 
      /** Conflict resolver. */
 -    private GridCacheVersionAbstractConflictResolver conflictRslvr;
 +    private CacheVersionConflictResolver conflictRslvr;
  
      /** */
      private CacheObjectContext cacheObjCtx;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 407b8d5,c918ed4..598130a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@@ -924,10 -895,574 +895,577 @@@ public class GridCacheProcessor extend
      }
  
      /**
-      * Creates shared context.
-      *
-      * @param kernalCtx Kernal context.
-      * @return Shared context.
+      * @param cache Cache.
+      * @throws IgniteCheckedException If failed.
+      */
+     @SuppressWarnings("unchecked")
+     private void onKernalStart(GridCacheAdapter<?, ?> cache) throws IgniteCheckedException
{
+         GridCacheContext<?, ?> ctx = cache.context();
+ 
+         // Start DHT cache as well.
+         if (isNearEnabled(ctx)) {
+             GridDhtCacheAdapter dht = ctx.near().dht();
+ 
+             GridCacheContext<?, ?> dhtCtx = dht.context();
+ 
+             for (GridCacheManager mgr : dhtManagers(dhtCtx))
+                 mgr.onKernalStart();
+ 
+             dht.onKernalStart();
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Executed onKernalStart() callback for DHT cache: " + dht.name());
+         }
+ 
+         for (GridCacheManager mgr : F.view(ctx.managers(), F0.notContains(dhtExcludes(ctx))))
+             mgr.onKernalStart();
+ 
+         cache.onKernalStart();
+ 
+         if (ctx.events().isRecordable(EventType.EVT_CACHE_STARTED))
+             ctx.events().addEvent(EventType.EVT_CACHE_STARTED);
+ 
+         if (log.isDebugEnabled())
+             log.debug("Executed onKernalStart() callback for cache [name=" + cache.name()
+ ", mode=" +
+                 cache.configuration().getCacheMode() + ']');
+     }
+ 
+     /**
+      * @param cache Cache to stop.
+      * @param cancel Cancel flag.
+      */
+     @SuppressWarnings("unchecked")
+     private void onKernalStop(GridCacheAdapter<?, ?> cache, boolean cancel) {
+         GridCacheContext ctx = cache.context();
+ 
+         if (isNearEnabled(ctx)) {
+             GridDhtCacheAdapter dht = ctx.near().dht();
+ 
+             if (dht != null) {
+                 GridCacheContext<?, ?> dhtCtx = dht.context();
+ 
+                 for (GridCacheManager mgr : dhtManagers(dhtCtx))
+                     mgr.onKernalStop(cancel);
+ 
+                 dht.onKernalStop();
+             }
+         }
+ 
+         List<GridCacheManager> mgrs = ctx.managers();
+ 
+         Collection<GridCacheManager> excludes = dhtExcludes(ctx);
+ 
+         // Reverse order.
+         for (ListIterator<GridCacheManager> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();
) {
+             GridCacheManager mgr = it.previous();
+ 
+             if (!excludes.contains(mgr))
+                 mgr.onKernalStop(cancel);
+         }
+ 
+         cache.onKernalStop();
+ 
+         if (ctx.events().isRecordable(EventType.EVT_CACHE_STOPPED))
+             ctx.events().addEvent(EventType.EVT_CACHE_STOPPED);
+     }
+ 
+     /**
+      * @param cfg Cache configuration to use to create cache.
+      * @return Cache context.
+      * @throws IgniteCheckedException If failed to create cache.
+      */
+     @SuppressWarnings({"unchecked"})
+     private GridCacheContext createCache(CacheConfiguration<?, ?> cfg, CacheObjectContext
cacheObjCtx) throws IgniteCheckedException {
+         assert cfg != null;
+ 
+         CacheStore cfgStore = cfg.getCacheStoreFactory() != null ? cfg.getCacheStoreFactory().create()
: null;
+ 
+         validate(ctx.config(), cfg, cfgStore);
+ 
+         CacheJtaManagerAdapter jta = JTA.create(cfg.getTransactionManagerLookupClassName()
== null);
+ 
+         jta.createTmLookup(cfg);
+ 
+         // Skip suggestions for system caches.
+         if (!sysCaches.contains(maskNull(cfg.getName())))
+             suggestOptimizations(cfg, cfgStore != null);
+ 
+         Collection<Object> toPrepare = new ArrayList<>();
+ 
+         toPrepare.add(jta.tmLookup());
+ 
+         if (cfgStore instanceof GridCacheLoaderWriterStore) {
+             toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).loader());
+             toPrepare.add(((GridCacheLoaderWriterStore)cfgStore).writer());
+         }
+         else
+             toPrepare.add(cfgStore);
+ 
+         prepare(cfg, toPrepare);
+ 
+         U.startLifecycleAware(lifecycleAwares(cfg, jta.tmLookup(), cfgStore));
+ 
+         GridCacheAffinityManager affMgr = new GridCacheAffinityManager();
+         GridCacheEventManager evtMgr = new GridCacheEventManager();
+         GridCacheSwapManager swapMgr = new GridCacheSwapManager(cfg.getCacheMode() == LOCAL
|| !GridCacheUtils.isNearEnabled(cfg));
+         GridCacheEvictionManager evictMgr = new GridCacheEvictionManager();
+         GridCacheQueryManager qryMgr = queryManager(cfg);
+         CacheContinuousQueryManager contQryMgr = new CacheContinuousQueryManager();
+         CacheDataStructuresManager dataStructuresMgr = new CacheDataStructuresManager();
+         GridCacheTtlManager ttlMgr = new GridCacheTtlManager();
+         GridCacheDrManager drMgr = ctx.createComponent(GridCacheDrManager.class);
++        CacheConflictManager rslvrMgr = ctx.createComponent(CacheConflictManager.class);
+ 
+         GridCacheStoreManager storeMgr = new GridCacheStoreManager(ctx, sesHolders, cfgStore,
cfg);
+ 
+         GridCacheContext<?, ?> cacheCtx = new GridCacheContext(
+             ctx,
+             sharedCtx,
+             cfg,
+             ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()),
+ 
+             /*
+              * Managers in starting order!
+              * ===========================
+              */
+             evtMgr,
+             swapMgr,
+             storeMgr,
+             evictMgr,
+             qryMgr,
+             contQryMgr,
+             affMgr,
+             dataStructuresMgr,
+             ttlMgr,
+             drMgr,
 -            jta);
++            jta,
++            rslvrMgr);
+ 
+         cacheCtx.cacheObjectContext(cacheObjCtx);
+ 
+         GridCacheAdapter cache = null;
+ 
+         switch (cfg.getCacheMode()) {
+             case LOCAL: {
+                 switch (cfg.getAtomicityMode()) {
+                     case TRANSACTIONAL: {
+                         cache = new GridLocalCache(cacheCtx);
+ 
+                         break;
+                     }
+                     case ATOMIC: {
+                         cache = new GridLocalAtomicCache(cacheCtx);
+ 
+                         break;
+                     }
+ 
+                     default: {
+                         assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
+                     }
+                 }
+ 
+                 break;
+             }
+             case PARTITIONED:
+             case REPLICATED: {
+                 if (GridCacheUtils.isNearEnabled(cfg)) {
+                     switch (cfg.getAtomicityMode()) {
+                         case TRANSACTIONAL: {
+                             cache = new GridNearTransactionalCache(cacheCtx);
+ 
+                             break;
+                         }
+                         case ATOMIC: {
+                             cache = new GridNearAtomicCache(cacheCtx);
+ 
+                             break;
+                         }
+ 
+                         default: {
+                             assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
+                         }
+                     }
+                 }
+                 else {
+                     switch (cfg.getAtomicityMode()) {
+                         case TRANSACTIONAL: {
+                             cache = cacheCtx.affinityNode() ?
+                                 new GridDhtColocatedCache(cacheCtx) :
+                                 new GridDhtColocatedCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+ 
+                             break;
+                         }
+                         case ATOMIC: {
+                             cache = cacheCtx.affinityNode() ?
+                                 new GridDhtAtomicCache(cacheCtx) :
+                                 new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+ 
+                             break;
+                         }
+ 
+                         default: {
+                             assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
+                         }
+                     }
+                 }
+ 
+                 break;
+             }
+ 
+             default: {
+                 assert false : "Invalid cache mode: " + cfg.getCacheMode();
+             }
+         }
+ 
+         cacheCtx.cache(cache);
+ 
+         GridCacheContext<?, ?> ret = cacheCtx;
+ 
+         /*
+          * Create DHT cache.
+          * ================
+          */
+         if (cfg.getCacheMode() != LOCAL && GridCacheUtils.isNearEnabled(cfg)) {
+             /*
+              * Specifically don't create the following managers
+              * here and reuse the one from Near cache:
+              * 1. GridCacheVersionManager
+              * 2. GridCacheIoManager
+              * 3. GridCacheDeploymentManager
+              * 4. GridCacheQueryManager (note, that we start it for DHT cache though).
+              * 5. CacheContinuousQueryManager (note, that we start it for DHT cache though).
+              * 6. GridCacheDgcManager
+              * 7. GridCacheTtlManager.
+              * ===============================================
+              */
+             swapMgr = new GridCacheSwapManager(true);
+             evictMgr = new GridCacheEvictionManager();
+             evtMgr = new GridCacheEventManager();
+             drMgr = ctx.createComponent(GridCacheDrManager.class);
+ 
+             cacheCtx = new GridCacheContext(
+                 ctx,
+                 sharedCtx,
+                 cfg,
+                 ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()),
+ 
+                 /*
+                  * Managers in starting order!
+                  * ===========================
+                  */
+                 evtMgr,
+                 swapMgr,
+                 storeMgr,
+                 evictMgr,
+                 qryMgr,
+                 contQryMgr,
+                 affMgr,
+                 dataStructuresMgr,
+                 ttlMgr,
+                 drMgr,
 -                jta);
++                jta,
++                rslvrMgr);
+ 
+             cacheCtx.cacheObjectContext(cacheObjCtx);
+ 
+             GridDhtCacheAdapter dht = null;
+ 
+             switch (cfg.getAtomicityMode()) {
+                 case TRANSACTIONAL: {
+                     assert cache instanceof GridNearTransactionalCache;
+ 
+                     GridNearTransactionalCache near = (GridNearTransactionalCache)cache;
+ 
+                     GridDhtCache dhtCache = cacheCtx.affinityNode() ?
+                         new GridDhtCache(cacheCtx) :
+                         new GridDhtCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+ 
+                     dhtCache.near(near);
+ 
+                     near.dht(dhtCache);
+ 
+                     dht = dhtCache;
+ 
+                     break;
+                 }
+                 case ATOMIC: {
+                     assert cache instanceof GridNearAtomicCache;
+ 
+                     GridNearAtomicCache near = (GridNearAtomicCache)cache;
+ 
+                     GridDhtAtomicCache dhtCache = cacheCtx.affinityNode() ?
+                         new GridDhtAtomicCache(cacheCtx) :
+                         new GridDhtAtomicCache(cacheCtx, new GridNoStorageCacheMap(cacheCtx));
+ 
+                     dhtCache.near(near);
+ 
+                     near.dht(dhtCache);
+ 
+                     dht = dhtCache;
+ 
+                     break;
+                 }
+ 
+                 default: {
+                     assert false : "Invalid cache atomicity mode: " + cfg.getAtomicityMode();
+                 }
+             }
+ 
+             cacheCtx.cache(dht);
+         }
+ 
+         return ret;
+     }
+ 
+     /**
+      * Gets a collection of currentlty started caches.
+      *
+      * @return Collection of started cache names.
+      */
+     public Collection<String> cacheNames() {
+         return F.viewReadOnly(registeredCaches.keySet(),
+             new IgniteClosure<String, String>() {
+                 @Override public String apply(String s) {
+                     return unmaskNull(s);
+                 }
+             });
+     }
+ 
+     /**
+      * Gets cache mode.
+      *
+      * @param cacheName Cache name to check.
+      * @return Cache mode.
+      */
+     public CacheMode cacheMode(String cacheName) {
+         DynamicCacheDescriptor desc = registeredCaches.get(maskNull(cacheName));
+ 
+         return desc != null ? desc.cacheConfiguration().getCacheMode() : null;
+     }
+ 
+     /**
+      * @param req Request to check.
+      * @return {@code True} if change request was registered to apply.
+      */
+     @SuppressWarnings("IfMayBeConditional")
+     public boolean dynamicCacheRegistered(DynamicCacheChangeRequest req) {
+         DynamicCacheDescriptor desc = registeredCaches.get(maskNull(req.cacheName()));
+ 
+         if (desc != null) {
+             if (desc.deploymentId().equals(req.deploymentId())) {
+                 if (req.start())
+                     return !desc.cancelled();
+                 else
+                     return desc.cancelled();
+             }
+ 
+             // If client requested cache start
+             if (req.initiatingNodeId() != null)
+                 return true;
+         }
+ 
+         return false;
+     }
+ 
+     /**
+      * @param reqs Requests to start.
+      * @throws IgniteCheckedException If failed to start cache.
+      */
+     @SuppressWarnings("TypeMayBeWeakened")
+     public void prepareCachesStart(
+         Collection<DynamicCacheChangeRequest> reqs,
+         AffinityTopologyVersion topVer
+     ) throws IgniteCheckedException {
+         for (DynamicCacheChangeRequest req : reqs) {
+             assert req.start();
+ 
+             prepareCacheStart(
+                 req.startCacheConfiguration(),
+                 req.nearCacheConfiguration(),
+                 req.clientStartOnly(),
+                 req.initiatingNodeId(),
+                 req.deploymentId(),
+                 topVer
+             );
+         }
+ 
+         // Start statically configured caches received from remote nodes during exchange.
+         for (DynamicCacheDescriptor desc : registeredCaches.values()) {
+             if (desc.staticallyConfigured() && !desc.locallyConfigured()) {
+                 if (desc.onStart()) {
+                     prepareCacheStart(
+                         desc.cacheConfiguration(),
+                         null,
+                         false,
+                         null,
+                         desc.deploymentId(),
+                         topVer
+                     );
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * @param cfg Start configuration.
+      * @param nearCfg Near configuration.
+      * @param clientStartOnly Client only start request.
+      * @param initiatingNodeId Initiating node ID.
+      * @param deploymentId Deployment ID.
+      */
+     private void prepareCacheStart(
+         CacheConfiguration cfg,
+         NearCacheConfiguration nearCfg,
+         boolean clientStartOnly,
+         UUID initiatingNodeId,
+         IgniteUuid deploymentId,
+         AffinityTopologyVersion topVer
+     ) throws IgniteCheckedException {
+         CacheConfiguration ccfg = new CacheConfiguration(cfg);
+ 
+         IgnitePredicate nodeFilter = ccfg.getNodeFilter();
+ 
+         ClusterNode locNode = ctx.discovery().localNode();
+ 
+         boolean affNodeStart = !clientStartOnly && nodeFilter.apply(locNode);
+         boolean clientNodeStart = locNode.id().equals(initiatingNodeId);
+ 
+         if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null)
+             return;
+ 
+         if (affNodeStart || clientNodeStart) {
+             if (clientNodeStart && !affNodeStart) {
+                 if (nearCfg != null)
+                     ccfg.setNearConfiguration(nearCfg);
+             }
+ 
+             CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(null, ccfg.getName(),
ccfg);
+ 
+             GridCacheContext cacheCtx = createCache(ccfg, cacheObjCtx);
+ 
+             cacheCtx.startTopologyVersion(topVer);
+ 
+             cacheCtx.dynamicDeploymentId(deploymentId);
+ 
+             sharedCtx.addCacheContext(cacheCtx);
+ 
+             caches.put(maskNull(cacheCtx.name()), cacheCtx.cache());
+ 
+             startCache(cacheCtx.cache());
+             onKernalStart(cacheCtx.cache());
+         }
+     }
+ 
+     /**
+      * @param req Stop request.
+      */
+     public void blockGateway(DynamicCacheChangeRequest req) {
+         assert req.stop();
+ 
+         // Break the proxy before exchange future is done.
+         IgniteCacheProxy<?, ?> proxy = jCacheProxies.get(maskNull(req.cacheName()));
+ 
+         if (proxy != null)
+             proxy.gate().block();
+     }
+ 
+     /**
+      * @param req Request.
+      */
+     private void stopGateway(DynamicCacheChangeRequest req) {
+         assert req.stop();
+ 
+         // Break the proxy before exchange future is done.
+         IgniteCacheProxy<?, ?> proxy = jCacheProxies.remove(maskNull(req.cacheName()));
+ 
+         if (proxy != null)
+             proxy.gate().onStopped();
+     }
+ 
+     /**
+      * @param req Stop request.
+      */
+     public void prepareCacheStop(DynamicCacheChangeRequest req) {
+         assert req.stop();
+ 
+         GridCacheAdapter<?, ?> cache = caches.remove(maskNull(req.cacheName()));
+ 
+         if (cache != null) {
+             GridCacheContext<?, ?> ctx = cache.context();
+ 
+             sharedCtx.removeCacheContext(ctx);
+ 
+             assert req.deploymentId().equals(ctx.dynamicDeploymentId()) : "Different deployment
IDs [req=" + req +
+                 ", ctxDepId=" + ctx.dynamicDeploymentId() + ']';
+ 
+             onKernalStop(cache, true);
+             stopCache(cache, true);
+         }
+     }
+ 
+     /**
+      * Callback invoked when first exchange future for dynamic cache is completed.
+      *
+      * @param topVer Completed topology version.
+      * @param reqs Change requests.
+      */
+     @SuppressWarnings("unchecked")
+     public void onExchangeDone(
+         AffinityTopologyVersion topVer,
+         Collection<DynamicCacheChangeRequest> reqs,
+         Throwable err
+     ) {
+         for (GridCacheAdapter<?, ?> cache : caches.values()) {
+             GridCacheContext<?, ?> cacheCtx = cache.context();
+ 
+             if (F.eq(cacheCtx.startTopologyVersion(), topVer)) {
+                 cacheCtx.preloader().onInitialExchangeComplete(err);
+ 
+                 String masked = maskNull(cacheCtx.name());
+ 
+                 jCacheProxies.put(masked, new IgniteCacheProxy(cache.context(), cache, null,
false));
+             }
+         }
+ 
+         if (!F.isEmpty(reqs) && err == null) {
+             for (DynamicCacheChangeRequest req : reqs) {
+                 String masked = maskNull(req.cacheName());
+ 
+                 if (req.stop()) {
+                     stopGateway(req);
+ 
+                     prepareCacheStop(req);
+ 
+                     DynamicCacheDescriptor desc = registeredCaches.get(masked);
+ 
+                     if (desc != null && desc.cancelled() && desc.deploymentId().equals(req.deploymentId()))
+                         registeredCaches.remove(masked, desc);
+                 }
+ 
+                 completeStartFuture(req);
+             }
+         }
+     }
+ 
+     /**
+      * @param req Request to complete future for.
+      */
+     public void completeStartFuture(DynamicCacheChangeRequest req) {
+         DynamicCacheStartFuture fut = (DynamicCacheStartFuture)pendingFuts.get(maskNull(req.cacheName()));
+ 
+         assert req.deploymentId() != null;
+         assert fut == null || fut.deploymentId != null;
+ 
+         if (fut != null && fut.deploymentId().equals(req.deploymentId()) &&
+             F.eq(req.initiatingNodeId(), ctx.localNodeId()))
+             fut.onDone();
+     }
+ 
+     /**
+      * Creates shared context.
+      *
+      * @param kernalCtx Kernal context.
+      * @return Shared context.
       */
      @SuppressWarnings("unchecked")
      private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/GridCacheDrManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/dr/os/GridOsCacheDrManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java
index 05311db,0000000..a18eedd
mode 100644,000000..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/CacheVersionConflictResolver.java
@@@ -1,85 -1,0 +1,85 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.internal.processors.cache.version;
 +
 +import org.apache.ignite.*;
 +import org.apache.ignite.internal.processors.cache.*;
 +
 +/**
 + * Cache version conflict resolver.
 + */
 +public class CacheVersionConflictResolver {
 +    /**
 +     * Resolve the conflict.
 +     *
 +     * @param oldEntry Old entry.
 +     * @param newEntry New entry.
 +     * @param atomicVerComparator Whether to use atomic version comparator.
 +     * @return Conflict resolution context.
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    public <K, V> GridCacheVersionConflictContext<K, V> resolve(GridCacheVersionedEntryEx<K,
V> oldEntry,
 +        GridCacheVersionedEntryEx<K, V> newEntry, boolean atomicVerComparator) throws
IgniteCheckedException {
 +        GridCacheVersionConflictContext<K, V> ctx = new GridCacheVersionConflictContext<>(oldEntry,
newEntry);
 +
 +        resolve0(ctx, oldEntry, newEntry, atomicVerComparator);
 +
 +        return ctx;
 +    }
 +
 +    /**
 +     * Internal conflict resolution routine.
 +     *
 +     * @param ctx Context.
 +     * @param oldEntry Old entry.
 +     * @param newEntry New entry.
 +     * @param atomicVerComparator Whether to use atomic version comparator.
 +     * @throws IgniteCheckedException If failed.
 +     */
 +    protected <K, V> void resolve0(GridCacheVersionConflictContext<K, V> ctx,
 +        GridCacheVersionedEntryEx<K, V> oldEntry, GridCacheVersionedEntryEx<K,
V> newEntry,
 +        boolean atomicVerComparator) throws IgniteCheckedException {
 +        if (newEntry.dataCenterId() != oldEntry.dataCenterId())
 +            ctx.useNew();
 +        else {
 +            if (oldEntry.isStartVersion())
 +                ctx.useNew();
 +            else {
 +                if (atomicVerComparator) {
 +                    // Handle special case when version check using ATOMIC cache comparator
is required.
-                     if (GridCacheMapEntry.ATOMIC_VER_COMPARATOR.compare(oldEntry.version(),
newEntry.version()) >= 0)
++                    if (GridCacheMapEntry.ATOMIC_VER_COMPARATOR.compare(oldEntry.version(),
newEntry.version(), false) >= 0)
 +                        ctx.useOld();
 +                    else
 +                        ctx.useNew();
 +                }
 +                else {
 +                    long topVerDiff = newEntry.topologyVersion() - oldEntry.topologyVersion();
 +
 +                    if (topVerDiff > 0)
 +                        ctx.useNew();
 +                    else if (topVerDiff < 0)
 +                        ctx.useOld();
 +                    else if (newEntry.order() > oldEntry.order())
 +                        ctx.useNew();
 +                    else
 +                        ctx.useOld();
 +                }
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b4285d2b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java
----------------------------------------------------------------------


Mime
View raw message