Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A474717786 for ; Wed, 10 Jun 2015 14:11:40 +0000 (UTC) Received: (qmail 23856 invoked by uid 500); 10 Jun 2015 14:11:40 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 23783 invoked by uid 500); 10 Jun 2015 14:11:40 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 23767 invoked by uid 99); 10 Jun 2015 14:11:40 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 14:11:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id ED14BCD03F for ; Wed, 10 Jun 2015 14:11:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.79 X-Spam-Level: * X-Spam-Status: No, score=1.79 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id fcqDmKk77rk6 for ; Wed, 10 Jun 2015 14:11:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id DFA2347BEB for ; Wed, 10 Jun 2015 14:11:23 +0000 (UTC) Received: (qmail 23276 invoked by uid 99); 10 Jun 2015 14:11:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jun 2015 14:11:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4E708E0332; Wed, 10 Jun 2015 14:11:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Wed, 10 Jun 2015 14:11:45 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [24/28] incubator-ignite git commit: ignite-545: merge from sprint-6 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 0e1a9c2..871cd77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -27,6 +27,7 @@ import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.events.*; import org.apache.ignite.internal.*; +import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.processors.*; import org.apache.ignite.internal.processors.affinity.*; import org.apache.ignite.internal.processors.cache.datastructures.*; @@ -47,7 +48,6 @@ import org.apache.ignite.internal.processors.plugin.*; import org.apache.ignite.internal.processors.query.*; import org.apache.ignite.internal.util.*; import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; @@ -153,7 +153,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { cfg.setMemoryMode(DFLT_MEMORY_MODE); if (cfg.getNodeFilter() == null) - cfg.setNodeFilter(CacheConfiguration.SERVER_NODES); + cfg.setNodeFilter(CacheConfiguration.ALL_NODES); if (cfg.getAffinity() == null) { if (cfg.getCacheMode() == PARTITIONED) { @@ -541,10 +541,10 @@ public class GridCacheProcessor extends GridProcessorAdapter { maxRebalanceOrder = validatePreloadOrder(ctx.config().getCacheConfiguration()); - ctx.discovery().setCustomEventListener(new GridPlainInClosure() { - @Override public void apply(Serializable evt) { - if (evt instanceof DynamicCacheChangeBatch) - onCacheChangeRequested((DynamicCacheChangeBatch)evt); + ctx.discovery().setCustomEventListener(DynamicCacheChangeBatch.class, + new CustomEventListener() { + @Override public void onCustomEvent(ClusterNode snd, DynamicCacheChangeBatch msg) { + onCacheChangeRequested(msg); } }); @@ -567,7 +567,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheConfiguration[] cfgs = ctx.config().getCacheConfiguration(); - sharedCtx = createSharedContext(ctx); + sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx, + ctx.config().getCacheStoreSessionListenerFactories())); ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)", !ctx.config().getTransactionConfiguration().isTxSerializableEnabled()); @@ -622,9 +623,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { ctx.discovery().setCacheFilter( cfg.getName(), cfg.getNodeFilter(), - cfg.getNearConfiguration() != null, + cfg.getNearConfiguration() != null && cfg.getCacheMode() == PARTITIONED, cfg.getCacheMode() == LOCAL); + ctx.discovery().addClientNode(cfg.getName(), + ctx.localNodeId(), + cfg.getNearConfiguration() != null); + if (!cacheType.userCache()) stopSeq.addLast(cfg.getName()); else @@ -669,6 +674,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) { for (ClusterNode n : ctx.discovery().remoteNodes()) { + if (n.attribute(ATTR_CONSISTENCY_CHECK_SKIPPED)) + continue; + checkTransactionConfiguration(n); DeploymentMode locDepMode = ctx.config().getDeploymentMode(); @@ -683,7 +691,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (rmtCfg != null) { CacheConfiguration locCfg = desc.cacheConfiguration(); - checkCache(locCfg, rmtCfg, n); + checkCache(locCfg, rmtCfg, n, desc); // Check plugin cache configurations. CachePluginManager pluginMgr = desc.pluginManager(); @@ -706,12 +714,15 @@ public class GridCacheProcessor extends GridProcessorAdapter { IgnitePredicate filter = ccfg.getNodeFilter(); - if (filter.apply(locNode)) { + boolean loc = desc.locallyConfigured(); + + if (loc || CU.affinityNode(locNode, filter)) { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); CachePluginManager pluginMgr = desc.pluginManager(); - GridCacheContext ctx = createCache(ccfg, pluginMgr, desc.cacheType(), cacheObjCtx); + GridCacheContext ctx = createCache( + ccfg, pluginMgr, desc.cacheType(), cacheObjCtx, desc.updatesAllowed()); ctx.dynamicDeploymentId(desc.deploymentId()); @@ -754,8 +765,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { int order = cfg.getRebalanceOrder(); if (order > 0 && order != maxRebalanceOrder && cfg.getCacheMode() != LOCAL) { - GridCompoundFuture fut = (GridCompoundFuture)preloadFuts - .get(order); + GridCompoundFuture fut = (GridCompoundFuture)preloadFuts.get(order); if (fut == null) { fut = new GridCompoundFuture<>(); @@ -774,18 +784,31 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (GridCacheAdapter cache : caches.values()) onKernalStart(cache); + boolean utilityCacheStarted = false; + // Wait for caches in SYNC preload mode. - for (GridCacheAdapter cache : caches.values()) { - CacheConfiguration cfg = cache.configuration(); + for (CacheConfiguration cfg : ctx.config().getCacheConfiguration()) { + GridCacheAdapter cache = caches.get(maskNull(cfg.getName())); + + if (cache != null) { + if (cfg.getRebalanceMode() == SYNC) { + if (cfg.getCacheMode() == REPLICATED || + (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) { + cache.preloader().syncFuture().get(); + + if (CU.isUtilityCache(cache.name())) { + ctx.cacheObjects().onUtilityCacheStarted(); - if (cfg.getRebalanceMode() == SYNC) { - if (cfg.getCacheMode() == REPLICATED || - (cfg.getCacheMode() == PARTITIONED && cfg.getRebalanceDelay() >= 0)) - cache.preloader().syncFuture().get(); + utilityCacheStarted = true; + } + } + } } } - ctx.cacheObjects().onCacheProcessorStarted(); + assert caches.containsKey(CU.MARSH_CACHE_NAME) : "Marshaller cache should be started"; + assert caches.containsKey(CU.UTILITY_CACHE_NAME) : "Utility cache should be started"; + assert utilityCacheStarted; } /** {@inheritDoc} */ @@ -812,6 +835,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { mgr.stop(cancel); } + CU.stopStoreSessionListeners(ctx, sharedCtx.storeSessionListeners()); + sharedCtx.cleanup(); if (log.isDebugEnabled()) @@ -1047,7 +1072,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { private GridCacheContext createCache(CacheConfiguration cfg, @Nullable CachePluginManager pluginMgr, CacheType cacheType, - CacheObjectContext cacheObjCtx) + CacheObjectContext cacheObjCtx, + boolean updatesAllowed) throws IgniteCheckedException { assert cfg != null; @@ -1105,6 +1131,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { cfg, cacheType, ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()), + updatesAllowed, /* * Managers in starting order! @@ -1234,6 +1261,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { cfg, cacheType, ctx.discovery().cacheAffinityNode(ctx.discovery().localNode(), cfg.getName()), + true, /* * Managers in starting order! @@ -1423,7 +1451,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { ClusterNode locNode = ctx.discovery().localNode(); - boolean affNodeStart = !clientStartOnly && nodeFilter.apply(locNode); + boolean affNodeStart = !clientStartOnly && CU.affinityNode(locNode, nodeFilter); boolean clientNodeStart = locNode.id().equals(initiatingNodeId); if (sharedCtx.cacheContext(CU.cacheId(cfg.getName())) != null) @@ -1437,7 +1465,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg); - GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx); + GridCacheContext cacheCtx = createCache(ccfg, null, cacheType, cacheObjCtx, true); cacheCtx.startTopologyVersion(topVer); @@ -1562,10 +1590,12 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Creates shared context. * * @param kernalCtx Kernal context. + * @param storeSesLsnrs Store session listeners. * @return Shared context. */ @SuppressWarnings("unchecked") - private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx) { + private GridCacheSharedContext createSharedContext(GridKernalContext kernalCtx, + Collection storeSesLsnrs) { IgniteTxManager tm = new IgniteTxManager(); GridCacheMvccManager mvccMgr = new GridCacheMvccManager(); GridCacheVersionManager verMgr = new GridCacheVersionManager(); @@ -1580,7 +1610,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { mvccMgr, depMgr, exchMgr, - ioMgr + ioMgr, + storeSesLsnrs ); } @@ -1867,7 +1898,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { // Check if we were asked to start a near cache. if (nearCfg != null) { - if (descCfg.getNodeFilter().apply(ctx.discovery().localNode())) { + if (CU.affinityNode(ctx.discovery().localNode(), descCfg.getNodeFilter())) { // If we are on a data node and near cache was enabled, return success, else - fail. if (descCfg.getNearConfiguration() != null) return new GridFinishedFuture<>(); @@ -1914,7 +1945,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { return new GridFinishedFuture<>(new CacheExistsException("Failed to start near cache " + "(a cache with the given name is not started): " + cacheName)); - if (ccfg.getNodeFilter().apply(ctx.discovery().localNode())) { + if (CU.affinityNode(ctx.discovery().localNode(), ccfg.getNodeFilter())) { if (ccfg.getNearConfiguration() != null) return new GridFinishedFuture<>(); else @@ -2202,11 +2233,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** * Checks that remote caches has configuration compatible with the local. * + * @param locCfg Local configuration. + * @param rmtCfg Remote configuration. * @param rmtNode Remote node. + * @param desc Cache descriptor. * @throws IgniteCheckedException If check failed. */ - private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode) - throws IgniteCheckedException { + private void checkCache(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode, + DynamicCacheDescriptor desc) throws IgniteCheckedException { ClusterNode locNode = ctx.discovery().localNode(); UUID rmt = rmtNode.id(); @@ -2214,6 +2248,9 @@ public class GridCacheProcessor extends GridProcessorAdapter { GridCacheAttributes rmtAttr = new GridCacheAttributes(rmtCfg); GridCacheAttributes locAttr = new GridCacheAttributes(locCfg); + boolean isLocAff = CU.affinityNode(locNode, locCfg.getNodeFilter()); + boolean isRmtAff = CU.affinityNode(rmtNode, rmtCfg.getNodeFilter()); + CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cacheMode", "Cache mode", locAttr.cacheMode(), rmtAttr.cacheMode(), true); @@ -2227,8 +2264,18 @@ public class GridCacheProcessor extends GridProcessorAdapter { CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "cachePreloadMode", "Cache preload mode", locAttr.cacheRebalanceMode(), rmtAttr.cacheRebalanceMode(), true); - if (locCfg.getAtomicityMode() == TRANSACTIONAL || - (rmtCfg.getNodeFilter().apply(rmtNode) && locCfg.getNodeFilter().apply(locNode))) + boolean checkStore; + + if (!isLocAff && isRmtAff && locCfg.getAtomicityMode() == TRANSACTIONAL) { + checkStore = locAttr.storeFactoryClassName() != null; + + if (locAttr.storeFactoryClassName() == null && rmtAttr.storeFactoryClassName() != null) + desc.updatesAllowed(false); + } + else + checkStore = isLocAff && isRmtAff; + + if (checkStore) CU.checkAttributeMismatch(log, rmtAttr.cacheName(), rmt, "storeFactory", "Store factory", locAttr.storeFactoryClassName(), rmtAttr.storeFactoryClassName(), true); @@ -2547,7 +2594,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache instance for given name. * @throws IgniteCheckedException If failed. */ - public IgniteCache publicJCache(@Nullable String cacheName) throws IgniteCheckedException { + public IgniteCacheProxy publicJCache(@Nullable String cacheName) throws IgniteCheckedException { return publicJCache(cacheName, true); } @@ -2561,7 +2608,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If failed. */ @SuppressWarnings({"unchecked", "ConstantConditions"}) - @Nullable public IgniteCache publicJCache(@Nullable String cacheName, boolean failIfNotStarted) + @Nullable public IgniteCacheProxy publicJCache(@Nullable String cacheName, boolean failIfNotStarted) throws IgniteCheckedException { if (log.isDebugEnabled()) @@ -2569,7 +2616,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { String masked = maskNull(cacheName); - IgniteCache cache = (IgniteCache)jCacheProxies.get(masked); + IgniteCacheProxy cache = jCacheProxies.get(masked); DynamicCacheDescriptor desc = registeredCaches.get(masked); @@ -2579,7 +2626,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { if (cache == null) cache = startJCache(cacheName, failIfNotStarted); - return cache; + return (IgniteCacheProxy)cache; } /** @@ -2589,7 +2636,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @return Cache instance for given name. * @throws IgniteCheckedException If failed. */ - private IgniteCache startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException { + private IgniteCacheProxy startJCache(String cacheName, boolean failIfNotStarted) throws IgniteCheckedException { String masked = maskNull(cacheName); DynamicCacheDescriptor desc = registeredCaches.get(masked); @@ -2619,7 +2666,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { F.first(initiateCacheChanges(F.asList(req))).get(); - IgniteCache cache = jCacheProxies.get(masked); + IgniteCacheProxy cache = jCacheProxies.get(masked); if (cache == null && failIfNotStarted) throw new IllegalArgumentException("Cache is not started: " + cacheName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 55d2f84..63ba242 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -329,7 +329,7 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte } /** {@inheritDoc} */ - @Nullable @Override public Map getAllOutTx(List keys) throws IgniteCheckedException { + @Nullable @Override public Map getAllOutTx(Set keys) throws IgniteCheckedException { CacheOperationContext prev = gate.enter(opCtx); try { @@ -341,6 +341,18 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte } /** {@inheritDoc} */ + @Nullable @Override public IgniteInternalFuture> getAllOutTxAsync(Set keys) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.getAllOutTxAsync(keys); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public boolean isIgfsDataCache() { CacheOperationContext prev = gate.enter(opCtx); @@ -741,6 +753,18 @@ public class GridCacheProxyImpl implements IgniteInternalCache, Exte } /** {@inheritDoc} */ + @Override public Set keySetx() { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.keySetx(); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ @Override public Set primaryKeySet() { CacheOperationContext prev = gate.enter(opCtx); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 294c2b0..1071ef2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.managers.deployment.*; import org.apache.ignite.internal.managers.discovery.*; import org.apache.ignite.internal.managers.eventstorage.*; import org.apache.ignite.internal.processors.affinity.*; +import org.apache.ignite.internal.processors.cache.store.*; import org.apache.ignite.internal.processors.cache.transactions.*; import org.apache.ignite.internal.processors.cache.version.*; import org.apache.ignite.internal.processors.timeout.*; @@ -76,6 +78,9 @@ public class GridCacheSharedContext { /** Preloaders start future. */ private IgniteInternalFuture preloadersStartFut; + /** Store session listeners. */ + private Collection storeSesLsnrs; + /** * @param txMgr Transaction manager. * @param verMgr Version manager. @@ -88,7 +93,8 @@ public class GridCacheSharedContext { GridCacheMvccManager mvccMgr, GridCacheDeploymentManager depMgr, GridCachePartitionExchangeManager exchMgr, - GridCacheIoManager ioMgr + GridCacheIoManager ioMgr, + Collection storeSesLsnrs ) { this.kernalCtx = kernalCtx; this.mvccMgr = add(mvccMgr); @@ -97,6 +103,7 @@ public class GridCacheSharedContext { this.depMgr = add(depMgr); this.exchMgr = add(exchMgr); this.ioMgr = add(ioMgr); + this.storeSesLsnrs = storeSesLsnrs; txMetrics = new TransactionMetricsAdapter(); @@ -427,27 +434,38 @@ public class GridCacheSharedContext { * @param tx Transaction to check. * @param activeCacheIds Active cache IDs. * @param cacheCtx Cache context. - * @return {@code True} if cross-cache transaction can include this new cache. + * @return Error message if transactions are incompatible. */ - public boolean txCompatible(IgniteInternalTx tx, Iterable activeCacheIds, GridCacheContext cacheCtx) { - if (cacheCtx.systemTx() ^ tx.system()) - return false; + @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, Iterable activeCacheIds, + GridCacheContext cacheCtx) { + if (cacheCtx.systemTx() && !tx.system()) + return "system cache can be enlisted only in system transaction"; + + if (!cacheCtx.systemTx() && tx.system()) + return "non-system cache can't be enlisted in system transaction"; for (Integer cacheId : activeCacheIds) { GridCacheContext activeCacheCtx = cacheContext(cacheId); - // System transactions may sap only one cache. if (cacheCtx.systemTx()) { if (activeCacheCtx.cacheId() != cacheCtx.cacheId()) - return false; + return "system transaction can include only one cache"; } - // Check that caches have the same store. - if (activeCacheCtx.store().store() != cacheCtx.store().store()) - return false; + CacheStoreManager store = cacheCtx.store(); + CacheStoreManager activeStore = activeCacheCtx.store(); + + if (store.isLocal() != activeStore.isLocal()) + return "caches with local and non-local stores can't be enlisted in one transaction"; + + if (store.isWriteBehind() != activeStore.isWriteBehind()) + return "caches with different write-behind setting can't be enlisted in one transaction"; + + // If local and write-behind validations passed, this must be true. + assert store.isWriteToStoreFromDht() == activeStore.isWriteToStoreFromDht(); } - return true; + return null; } /** @@ -499,6 +517,7 @@ public class GridCacheSharedContext { /** * @param tx Transaction to rollback. * @throws IgniteCheckedException If failed. + * @return Rollback future. */ public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException { Collection cacheIds = tx.activeCacheIds(); @@ -512,6 +531,13 @@ public class GridCacheSharedContext { } /** + * @return Store session listeners. + */ + @Nullable public Collection storeSessionListeners() { + return storeSesLsnrs; + } + + /** * @param mgr Manager to add. * @return Added manager. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java index eb82218..772e849 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java @@ -121,6 +121,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { warnFirstEvict(); writeToSwap(part, cctx.toCacheKeyObject(kb), vb); + + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapEvict(); } catch (IgniteCheckedException e) { log.error("Failed to unmarshal off-heap entry [part=" + part + ", hash=" + hash + ']', e); @@ -395,8 +398,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @return Reconstituted swap entry or {@code null} if entry is obsolete. * @throws IgniteCheckedException If failed. */ - @Nullable private X swapEntry(X e) throws IgniteCheckedException - { + @Nullable private X swapEntry(X e) throws IgniteCheckedException { assert e != null; checkIteratorQueue(); @@ -425,9 +427,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); // First check off-heap store. - if (offheapEnabled) - if (offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()))) + if (offheapEnabled) { + boolean contains = offheap.contains(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRead(contains); + + if (contains) return true; + } if (swapEnabled) { assert key != null; @@ -436,6 +444,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())), cctx.deploy().globalLoader()); + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRead(valBytes != null); + return valBytes != null; } @@ -444,7 +455,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { /** * @param key Key to read. - * @param keyBytes Key bytes. * @param part Key partition. * @param entryLocked {@code True} if cache entry is locked. * @param readOffheap Read offheap flag. @@ -481,6 +491,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (readOffheap && offheapEnabled) { byte[] bytes = offheap.get(spaceName, part, key, keyBytes); + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRead(bytes != null); + if (bytes != null) return swapEntry(unmarshalSwapEntry(bytes)); } @@ -524,6 +537,13 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (offheapEnabled) { byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + if (cctx.config().isStatisticsEnabled()) { + if (entryBytes != null) + cctx.cache().metrics0().onOffHeapRemove(); + + cctx.cache().metrics0().onOffHeapRead(entryBytes != null); + } + if (entryBytes != null) { GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); @@ -567,8 +587,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @return Value from swap or {@code null}. * @throws IgniteCheckedException If failed. */ - @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key, - final int part) + @Nullable private GridCacheSwapEntry readAndRemoveSwap(final KeyCacheObject key, final int part) throws IgniteCheckedException { if (!swapEnabled) return null; @@ -582,6 +601,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { swapMgr.remove(spaceName, swapKey, new CI1() { @Override public void apply(byte[] rmv) { + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRead(rmv != null); + if (rmv != null) { try { GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv)); @@ -611,6 +633,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { null); } + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRemove(); + // Always fire this event, since preloading depends on it. onUnswapped(part, key, entry); @@ -649,12 +674,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (!offheapEnabled && !swapEnabled) return null; - return read(entry.key(), - entry.key().valueBytes(cctx.cacheObjectContext()), - entry.partition(), - locked, - readOffheap, - readSwap); + return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked, + readOffheap, readSwap); } /** @@ -730,6 +751,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { final GridCacheQueryManager qryMgr = cctx.queries(); Collection unprocessedKeys = null; + final Collection res = new ArrayList<>(keys.size()); // First try removing from offheap. @@ -737,8 +759,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { for (KeyCacheObject key : keys) { int part = cctx.affinity().partition(key); - byte[] entryBytes = - offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + byte[] entryBytes = offheap.remove(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if(entryBytes != null && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); if (entryBytes != null) { GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes)); @@ -848,6 +872,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { null); } + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRemove(); + // Always fire this event, since preloading depends on it. onUnswapped(swapKey.partition(), key, entry); @@ -880,7 +907,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - return offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if(rmv && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); + + return rmv; } /** @@ -925,6 +957,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return; try { + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapRemove(); + GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv)); if (entry == null) @@ -942,11 +977,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { // First try offheap. if (offheapEnabled) { - byte[] val = offheap.remove(spaceName, - part, - key.value(cctx.cacheObjectContext(), false), + byte[] val = offheap.remove(spaceName, part, key.value(cctx.cacheObjectContext(), false), key.valueBytes(cctx.cacheObjectContext())); + if(val != null && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); + if (val != null) { if (c != null) c.apply(val); // Probably we should read value and apply closure before removing... @@ -1007,6 +1043,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (offheapEnabled) { offheap.put(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext()), entry.marshal()); + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapWrite(); + if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP)) cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null); @@ -1035,11 +1074,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { if (offheapEnabled) { for (GridCacheBatchSwapEntry swapEntry : swapped) { - offheap.put(spaceName, - swapEntry.partition(), - swapEntry.key(), - swapEntry.key().valueBytes(cctx.cacheObjectContext()), - swapEntry.marshal()); + offheap.put(spaceName, swapEntry.partition(), swapEntry.key(), + swapEntry.key().valueBytes(cctx.cacheObjectContext()), swapEntry.marshal()); + + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapWrite(); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_TO_OFFHEAP)) cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(), @@ -1071,6 +1110,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { qryMgr.onSwap(batchSwapEntry.key()); } } + + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapWrite(batch.size()); } } @@ -1082,17 +1124,15 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { * @param entry Entry bytes. * @throws IgniteCheckedException If failed. */ - private void writeToSwap(int part, - KeyCacheObject key, - byte[] entry) - throws IgniteCheckedException - { + private void writeToSwap(int part, KeyCacheObject key, byte[] entry) throws IgniteCheckedException { checkIteratorQueue(); swapMgr.write(spaceName, new SwapKey(key.value(cctx.cacheObjectContext(), false), part, key.valueBytes(cctx.cacheObjectContext())), - entry, - cctx.deploy().globalLoader()); + entry, cctx.deploy().globalLoader()); + + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onSwapWrite(); if (cctx.events().isRecordable(EVT_CACHE_OBJECT_SWAPPED)) cctx.events().addEvent(part, key, cctx.nodeId(), (IgniteUuid) null, null, @@ -1274,7 +1314,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if(rmv && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); } else it.removeX(); @@ -1432,6 +1475,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { return it.hasNext(); } + @SuppressWarnings("unchecked") @Override protected void onRemove() throws IgniteCheckedException { if (cur == null) throw new IllegalStateException("Method next() has not yet been called, or the remove() method " + @@ -1616,7 +1660,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if(rmv && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); } @Override protected void onClose() throws IgniteCheckedException { @@ -1646,7 +1693,10 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter { int part = cctx.affinity().partition(key); - offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + boolean rmv = offheap.removex(spaceName, part, key, key.valueBytes(cctx.cacheObjectContext())); + + if(rmv && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onOffHeapRemove(); } }; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index 5f9049a..9bd6321 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -43,7 +43,14 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { - if (cctx.kernalContext().isDaemon() || !cctx.config().isEagerTtl()) + boolean cleanupDisabled = cctx.kernalContext().isDaemon() || + !cctx.config().isEagerTtl() || + CU.isAtomicsCache(cctx.name()) || + CU.isMarshallerCache(cctx.name()) || + CU.isUtilityCache(cctx.name()) || + (cctx.kernalContext().clientNode() && cctx.config().getNearConfiguration() == null); + + if (cleanupDisabled) return; cleanupWorker = new CleanupWorker(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index 549f42f..3bd2a45 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.*; import org.apache.ignite.cache.*; import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.cache.store.*; import org.apache.ignite.cluster.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; @@ -34,12 +35,14 @@ import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; +import org.apache.ignite.lifecycle.*; import org.apache.ignite.plugin.*; import org.apache.ignite.transactions.*; import org.jetbrains.annotations.*; import org.jsr166.*; import javax.cache.*; +import javax.cache.configuration.*; import javax.cache.expiry.*; import javax.cache.integration.*; import java.io.*; @@ -114,13 +117,6 @@ public class GridCacheUtils { } }; - /** Not evicted partitions. */ - private static final IgnitePredicate PART_NOT_EVICTED = new P1() { - @Override public boolean apply(GridDhtLocalPartition p) { - return p.state() != GridDhtPartitionState.EVICTED; - } - }; - /** */ private static final IgniteClosure VER_ARR_FACTORY = new C1() { @@ -398,30 +394,11 @@ public class GridCacheUtils { * @return Partition to state transformer. */ @SuppressWarnings({"unchecked"}) - public static IgniteClosure part2state() { + public static IgniteClosure part2state() { return PART2STATE; } /** - * @return Not evicted partitions. - */ - @SuppressWarnings( {"unchecked"}) - public static IgnitePredicate notEvicted() { - return PART_NOT_EVICTED; - } - - /** - * Gets all nodes on which cache with the same name is started. - * - * @param ctx Cache context. - * @return All nodes on which cache with the same name is started (including nodes - * that may have already left). - */ - public static Collection allNodes(GridCacheContext ctx) { - return allNodes(ctx, AffinityTopologyVersion.NONE); - } - - /** * Gets all nodes on which cache with the same name is started. * * @param ctx Cache context. @@ -446,59 +423,6 @@ public class GridCacheUtils { } /** - * Gets alive nodes. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. - */ - public static Collection aliveNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveCacheNodes(ctx.namex(), topOrder); - } - - /** - * Gets remote nodes on which cache with the same name is started. - * - * @param ctx Cache context. - * @return Remote nodes on which cache with the same name is started. - */ - public static Collection remoteNodes(final GridCacheContext ctx) { - return remoteNodes(ctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets remote node with at least one cache configured. - * - * @param ctx Shared cache context. - * @return Collection of nodes with at least one cache configured. - */ - public static Collection remoteNodes(GridCacheSharedContext ctx) { - return remoteNodes(ctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets remote nodes on which cache with the same name is started. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Remote nodes on which cache with the same name is started. - */ - public static Collection remoteNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().remoteCacheNodes(ctx.namex(), topOrder); - } - - /** - * Gets alive nodes. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. - */ - public static Collection aliveRemoteNodes(final GridCacheContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveRemoteCacheNodes(ctx.namex(), topOrder); - } - - /** * Gets remote nodes with at least one cache configured. * * @param ctx Cache shared context. @@ -510,25 +434,15 @@ public class GridCacheUtils { } /** - * Gets alive nodes with at least one cache configured. - * - * @param ctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Affinity nodes. - */ - public static Collection aliveCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveNodesWithCaches(topOrder); - } - - /** * Gets alive remote nodes with at least one cache configured. * * @param ctx Cache context. * @param topOrder Maximum allowed node order. * @return Affinity nodes. */ - public static Collection aliveRemoteCacheNodes(final GridCacheSharedContext ctx, AffinityTopologyVersion topOrder) { - return ctx.discovery().aliveRemoteNodesWithCaches(topOrder); + public static Collection aliveRemoteServerNodesWithCaches(final GridCacheSharedContext ctx, + AffinityTopologyVersion topOrder) { + return ctx.discovery().aliveRemoteServerNodesWithCaches(topOrder); } /** @@ -577,90 +491,34 @@ public class GridCacheUtils { } /** - * Checks if given node has specified cache started. - * - * @param cacheName Cache name. - * @param node Node to check. - * @return {@code True} if given node has specified cache started. - */ - public static boolean cacheNode(String cacheName, ClusterNode node) { - return cacheNode(cacheName, (GridCacheAttributes[])node.attribute(ATTR_CACHE)); - } - - /** - * Checks if given attributes relate the the node which has (or had) specified cache started. - * - * @param cacheName Cache name. - * @param caches Node cache attributes. - * @return {@code True} if given node has specified cache started. - */ - public static boolean cacheNode(String cacheName, GridCacheAttributes[] caches) { - if (caches != null) - for (GridCacheAttributes attrs : caches) - if (F.eq(cacheName, attrs.cacheName())) - return true; - - return false; - } - - /** - * Gets oldest alive node for specified topology version. - * - * @param cctx Cache context. - * @return Oldest node for the current topology version. - */ - public static ClusterNode oldest(GridCacheContext cctx) { - return oldest(cctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets oldest alive node across nodes with at least one cache configured. - * - * @param ctx Cache context. - * @return Oldest node. - */ - public static ClusterNode oldest(GridCacheSharedContext ctx) { - return oldest(ctx, AffinityTopologyVersion.NONE); - } - - /** - * Gets oldest alive node for specified topology version. + * Gets oldest alive server node with at least one cache configured for specified topology version. * - * @param cctx Cache context. - * @param topOrder Maximum allowed node order. - * @return Oldest node for the given topology version. + * @param ctx Context. + * @param topVer Maximum allowed topology version. + * @return Oldest alive cache server node. */ - public static ClusterNode oldest(GridCacheContext cctx, AffinityTopologyVersion topOrder) { - ClusterNode oldest = null; + @Nullable public static ClusterNode oldestAliveCacheServerNode(GridCacheSharedContext ctx, + AffinityTopologyVersion topVer) { + Collection nodes = ctx.discovery().aliveServerNodesWithCaches(topVer); - for (ClusterNode n : aliveNodes(cctx, topOrder)) - if (oldest == null || n.order() < oldest.order()) - oldest = n; - - assert oldest != null : "Failed to find oldest node for cache context [name=" + cctx.name() + ", topOrder=" + topOrder + ']'; - assert oldest.order() <= topOrder.topologyVersion() || AffinityTopologyVersion.NONE.equals(topOrder); + if (nodes.isEmpty()) + return null; - return oldest; + return oldest(nodes); } /** - * Gets oldest alive node with at least one cache configured for specified topology version. - * - * @param cctx Shared cache context. - * @param topOrder Maximum allowed node order. + * @param nodes Nodes. * @return Oldest node for the given topology version. */ - public static ClusterNode oldest(GridCacheSharedContext cctx, AffinityTopologyVersion topOrder) { + @Nullable public static ClusterNode oldest(Collection nodes) { ClusterNode oldest = null; - for (ClusterNode n : aliveCacheNodes(cctx, topOrder)) { + for (ClusterNode n : nodes) { if (oldest == null || n.order() < oldest.order()) oldest = n; } - assert oldest != null : "Failed to find oldest node with caches: " + topOrder; - assert oldest.order() <= topOrder.topologyVersion() || AffinityTopologyVersion.NONE.equals(topOrder); - return oldest; } @@ -718,30 +576,6 @@ public class GridCacheUtils { } /** - * @return Closure that converts tx entry to key. - */ - @SuppressWarnings({"unchecked"}) - public static IgniteClosure tx2key() { - return (IgniteClosure)tx2key; - } - - /** - * @return Closure that converts tx entry collection to key collection. - */ - @SuppressWarnings({"unchecked"}) - public static IgniteClosure, Collection> txCol2Key() { - return (IgniteClosure, Collection>)txCol2key; - } - - /** - * @return Converts transaction entry to cache entry. - */ - @SuppressWarnings( {"unchecked"}) - public static IgniteClosure tx2entry() { - return (IgniteClosure)tx2entry; - } - - /** * @return Closure which converts transaction entry xid to XID version. */ @SuppressWarnings( {"unchecked"}) @@ -1451,13 +1285,7 @@ public class GridCacheUtils { } /** - * @return Cache ID for utility cache. - */ - public static int utilityCacheId() { - return cacheId(UTILITY_CACHE_NAME); - } - - /** + * @param cacheName Cache name. * @return Cache ID. */ public static int cacheId(String cacheName) { @@ -1688,7 +1516,7 @@ public class GridCacheUtils { /** * @param aff Affinity. * @param n Node. - * @return Predicate that evaulates to {@code true} if entry is primary for node. + * @return Predicate that evaluates to {@code true} if entry is primary for node. */ public static CacheEntryPredicate cachePrimary( final Affinity aff, @@ -1790,4 +1618,76 @@ public class GridCacheUtils { return res; } + + /** + * @param node Node. + * @return {@code True} if given node is client node (has flag {@link IgniteConfiguration#isClientMode()} set). + */ + public static boolean clientNode(ClusterNode node) { + Boolean clientModeAttr = node.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE); + + assert clientModeAttr != null : node; + + return clientModeAttr != null && clientModeAttr; + } + + /** + * @param node Node. + * @param filter Node filter. + * @return {@code True} if node is not client node and pass given filter. + */ + public static boolean affinityNode(ClusterNode node, IgnitePredicate filter) { + return !clientNode(node) && filter.apply(node); + } + + /** + * Creates and starts store session listeners. + * + * @param ctx Kernal context. + * @param factories Factories. + * @return Listeners. + * @throws IgniteCheckedException In case of error. + */ + public static Collection startStoreSessionListeners(GridKernalContext ctx, + Factory[] factories) throws IgniteCheckedException { + if (factories == null) + return null; + + Collection lsnrs = new ArrayList<>(factories.length); + + for (Factory factory : factories) { + CacheStoreSessionListener lsnr = factory.create(); + + if (lsnr != null) { + ctx.resource().injectGeneric(lsnr); + + if (lsnr instanceof LifecycleAware) + ((LifecycleAware)lsnr).start(); + + lsnrs.add(lsnr); + } + } + + return lsnrs; + } + + /** + * Stops store session listeners. + * + * @param ctx Kernal context. + * @param sesLsnrs Session listeners. + * @throws IgniteCheckedException In case of error. + */ + public static void stopStoreSessionListeners(GridKernalContext ctx, Collection sesLsnrs) + throws IgniteCheckedException { + if (sesLsnrs == null) + return; + + for (CacheStoreSessionListener lsnr : sesLsnrs) { + if (lsnr instanceof LifecycleAware) + ((LifecycleAware)lsnr).stop(); + + ctx.resource().cleanupGeneric(lsnr); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index f840015..4390993 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -699,6 +699,29 @@ public class IgniteCacheProxy extends AsyncSupportAdapter getAllOutTx(Set keys) { + try { + CacheOperationContext prev = onEnter(opCtx); + + try { + if (isAsync()) { + setFuture(delegate.getAllOutTxAsync(keys)); + + return null; + } + else + return delegate.getAllOutTx(keys); + } + finally { + onLeave(prev); + } + } + catch (IgniteCheckedException e) { + throw cacheException(e); + } + } + /** * @param keys Keys. * @return Values map. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 5184115..d98379c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -775,6 +775,11 @@ public interface IgniteInternalCache extends Iterable> { public Set keySet(); /** + * @return Set of keys including internal keys. + */ + public Set keySetx(); + + /** * Set of keys for which this node is primary. * This set is dynamic and may change with grid topology changes. * Note that this set will contain mappings for all keys, even if their values are @@ -1618,7 +1623,16 @@ public interface IgniteInternalCache extends Iterable> { * @return Value. * @throws IgniteCheckedException If failed. */ - @Nullable public Map getAllOutTx(List keys) throws IgniteCheckedException; + public Map getAllOutTx(Set keys) throws IgniteCheckedException; + + /** + * Gets values from cache. Will bypass started transaction, if any, i.e. will not enlist entries + * and will not lock any keys if pessimistic transaction is started by thread. + * + * @param keys Keys to get values for. + * @return Future for getAllOutTx operation. + */ + public IgniteInternalFuture> getAllOutTxAsync(Set keys); /** * Checks whether this cache is IGFS data cache. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java index 0186a90..0790052 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/affinity/GridCacheAffinityImpl.java @@ -84,9 +84,7 @@ public class GridCacheAffinityImpl implements Affinity { @Override public int[] primaryPartitions(ClusterNode n) { A.notNull(n, "n"); - AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); - - Set parts = cctx.affinity().primaryPartitions(n.id(), topVer); + Set parts = cctx.affinity().primaryPartitions(n.id(), topologyVersion()); return U.toIntArray(parts); } @@ -95,9 +93,7 @@ public class GridCacheAffinityImpl implements Affinity { @Override public int[] backupPartitions(ClusterNode n) { A.notNull(n, "n"); - AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); - - Set parts = cctx.affinity().backupPartitions(n.id(), topVer); + Set parts = cctx.affinity().backupPartitions(n.id(), topologyVersion()); return U.toIntArray(parts); } @@ -108,7 +104,7 @@ public class GridCacheAffinityImpl implements Affinity { Collection parts = new HashSet<>(); - AffinityTopologyVersion topVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion()); + AffinityTopologyVersion topVer = topologyVersion(); for (int partsCnt = partitions(), part = 0; part < partsCnt; part++) { for (ClusterNode affNode : cctx.affinity().nodes(part, topVer)) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java index fa8d192..b5c5161 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java @@ -218,7 +218,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter { } }, new QueueHeaderPredicate(), - cctx.isLocal() || cctx.isReplicated(), + cctx.isLocal() || (cctx.isReplicated() && cctx.affinityNode()), true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index b79f9d5..bd72764 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -327,13 +327,6 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { } /** - * - */ - public void onUnlock() { - // No-op. - } - - /** * Unlocks local lock. * * @return Removed candidate, or null if thread still holds the lock. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index fded3c9..bd1dedf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -63,6 +63,9 @@ public class GridDistributedTxMapping implements Externalizable { /** {@code True} if mapping is for near caches, {@code false} otherwise. */ private boolean near; + /** {@code True} if this is first mapping for optimistic tx on client node. */ + private boolean clientFirst; + /** * Empty constructor required for {@link Externalizable}. */ @@ -108,6 +111,20 @@ public class GridDistributedTxMapping implements Externalizable { } /** + * @return {@code True} if this is first mapping for optimistic tx on client node. + */ + public boolean clientFirst() { + return clientFirst; + } + + /** + * @param clientFirst {@code True} if this is first mapping for optimistic tx on client node. + */ + public void clientFirst(boolean clientFirst) { + this.clientFirst = clientFirst; + } + + /** * @return {@code True} if mapping is for near caches, {@code false} otherwise. */ public boolean near() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 331de4e..c3f3e7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -210,7 +210,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); + + assert oldest != null; if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -218,7 +220,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId)) { + if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -665,7 +667,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId.equals(cctx.localNodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); // If this node became the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -715,7 +717,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, topVer); ClusterNode loc = cctx.localNode(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java index 303d649..7bae7f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAssignmentFetchFuture.java @@ -96,12 +96,12 @@ public class GridDhtAssignmentFetchFuture extends GridFutureAdapter extends GridCompoundIdentityFuture +public final class GridDhtLockFuture extends GridCompoundIdentityFuture implements GridCacheMvccFuture, GridDhtFuture, GridCacheMappedVersion { /** */ private static final long serialVersionUID = 0L; @@ -60,7 +60,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture cctx; + private GridCacheContext cctx; /** Near node ID. */ private UUID nearNodeId; @@ -151,7 +151,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture cctx, + GridCacheContext cctx, UUID nearNodeId, GridCacheVersion nearLockVer, @NotNull AffinityTopologyVersion topVer, @@ -221,7 +221,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture cacheCtx, int invalidPart) { + void addInvalidPartition(GridCacheContext cacheCtx, int invalidPart) { invalidParts.add(invalidPart); // Register invalid partitions with transaction. @@ -1170,7 +1170,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture cacheCtx, Collection keys, UUID nodeId, long msgId, + private void evictReaders(GridCacheContext cacheCtx, Collection keys, UUID nodeId, long msgId, @Nullable List entries) { if (entries == null || keys == null || entries.isEmpty() || keys.isEmpty()) return; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 073e0e7..374ab87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -41,7 +41,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh * Partition topology. */ @GridToStringExclude -class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { +class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** If true, then check consistency. */ private static final boolean CONSISTENCY_CHECK = false; @@ -49,7 +49,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { private static final boolean FULL_MAP_DEBUG = false; /** Context. */ - private final GridCacheContext cctx; + private final GridCacheContext cctx; /** Logger. */ private final IgniteLogger log; @@ -85,7 +85,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** * @param cctx Context. */ - GridDhtPartitionTopologyImpl(GridCacheContext cctx) { + GridDhtPartitionTopologyImpl(GridCacheContext cctx) { assert cctx != null; this.cctx = cctx; @@ -239,7 +239,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx.shared(), topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + assert oldest != null; if (log.isDebugEnabled()) log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); @@ -247,7 +249,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { long updateSeq = this.updateSeq.incrementAndGet(); // If this is the oldest node. - if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId())) { + if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion())) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -274,7 +276,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (cctx.rebalanceEnabled()) { for (int p = 0; p < num; p++) { // If this is the first node in grid. - boolean added = exchFut.isCacheAdded(cctx.cacheId()); + boolean added = exchFut.isCacheAdded(cctx.cacheId(), exchId.topologyVersion()); if ((oldest.id().equals(loc.id()) && oldest.id().equals(exchId.nodeId()) && exchId.isJoined()) || added) { assert exchId.isJoined() || added; @@ -604,7 +606,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { return new GridDhtPartitionMap(cctx.nodeId(), updateSeq.get(), - F.viewReadOnly(locParts, CU.part2state()), true); + F.viewReadOnly(locParts, CU.part2state()), true); } finally { lock.readLock().unlock(); @@ -660,13 +662,15 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @return List of nodes for the partition. */ private List nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) { - Collection allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.allNodes(cctx, topVer)) : null; + Collection allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null; lock.readLock().lock(); try { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + - ", allIds=" + allIds + ", node2part=" + node2part + ']'; + ", allIds=" + allIds + + ", node2part=" + node2part + + ", cache=" + cctx.name() + ']'; Collection nodeIds = part2node.get(p); @@ -738,7 +742,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", locNodeId=" + cctx.localNode().id() + ", locName=" + cctx.gridName() + ']'; + ", cache=" + cctx.name() + + ", started=" + cctx.started() + + ", stopping=" + stopping + + ", locNodeId=" + cctx.localNode().id() + + ", locName=" + cctx.gridName() + ']'; GridDhtPartitionFullMap m = node2part; @@ -756,6 +764,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); + assert partMap != null; + lock.writeLock().lock(); try { @@ -1024,7 +1034,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert nodeId.equals(cctx.nodeId()); // In case if node joins, get topology at the time of joining node. - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + assert oldest != null; // If this node became the oldest node. if (oldest.id().equals(cctx.nodeId())) { @@ -1074,7 +1086,9 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert nodeId != null; assert lock.writeLock().isHeldByCurrentThread(); - ClusterNode oldest = CU.oldest(cctx, topVer); + ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer); + + assert oldest != null; ClusterNode loc = cctx.localNode();