Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9AB39200BF8 for ; Thu, 29 Dec 2016 10:37:11 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 99814160B40; Thu, 29 Dec 2016 09:37:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5E351160B4A for ; Thu, 29 Dec 2016 10:37:07 +0100 (CET) Received: (qmail 82526 invoked by uid 500); 29 Dec 2016 09:37:06 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 82229 invoked by uid 99); 29 Dec 2016 09:37:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Dec 2016 09:37:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DC34AF3519; Thu, 29 Dec 2016 09:37:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Thu, 29 Dec 2016 09:37:16 -0000 Message-Id: <947f5e7aca02420fa8234bc17ed6788a@git.apache.org> In-Reply-To: <9ed165a678e44bff8ed6ffb3e90a45ab@git.apache.org> References: <9ed165a678e44bff8ed6ffb3e90a45ab@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/50] [abbrv] ignite git commit: Merge with master - WIP. archived-at: Thu, 29 Dec 2016 09:37:11 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/Ignite.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 4cbf50e,4d85c54..cb19875 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@@ -471,23 -483,34 +493,50 @@@ public final class IgniteSystemProperti @Deprecated public static final String IGNITE_BINARY_DONT_WRAP_TREE_STRUCTURES = "IGNITE_BINARY_DONT_WRAP_TREE_STRUCTURES"; + /** */ + public static final String IGNITE_IO_BALANCE_PERIOD = "IGNITE_IO_BALANCE_PERIOD"; + + /** + * When set to {@code true} fields are written by BinaryMarshaller in sorted order. Otherwise + * the natural order is used. + *

+ * @deprecated Should be removed in Apache Ignite 2.0. + */ + @Deprecated + public static final String IGNITE_BINARY_SORT_OBJECT_FIELDS = "IGNITE_BINARY_SORT_OBJECT_FIELDS"; + + /** + * Whether Ignite can access unaligned memory addresses. + *

+ * Defaults to {@code} false, meaning that unaligned access will be performed only on x86 architecture. + */ + public static final String IGNITE_MEMORY_UNALIGNED_ACCESS = "IGNITE_MEMORY_UNALIGNED_ACCESS"; + + /** + * When unsafe memory copy if performed below this threshold, Ignite will do it on per-byte basis instead of + * calling to Unsafe.copyMemory(). + *

+ * Defaults to 0, meaning that threshold is disabled. + */ + public static final String IGNITE_MEMORY_PER_BYTE_COPY_THRESHOLD = "IGNITE_MEMORY_PER_BYTE_COPY_THRESHOLD"; + /** + * When set to {@code true} fields are written by BinaryMarshaller in sorted order. Otherwise + * the natural order is used. + *

+ * @deprecated Should be removed in Apache Ignite 2.0. + */ + @Deprecated + public static final String IGNITE_BINARY_SORT_OBJECT_FIELDS = "IGNITE_BINARY_SORT_OBJECT_FIELDS"; + + /** + * Whether Ignite can access unaligned memory addresses. + *

+ * Defaults to {@code} false, meaning that unaligned access will be performed only on x86 architecture. + */ + public static final String IGNITE_MEMORY_UNALIGNED_ACCESS = "IGNITE_MEMORY_UNALIGNED_ACCESS"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/cache/affinity/fair/FairAffinityFunction.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index e15c989,56fc5b4..f24a940 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@@ -329,6 -341,12 +336,9 @@@ public class CacheConfiguration e /** Maximum batch size for write-behind cache store. */ private int writeBehindBatchSize = DFLT_WRITE_BEHIND_BATCH_SIZE; + /** Maximum number of query iterators that can be stored. */ + private int maxQryIterCnt = DFLT_MAX_QUERY_ITERATOR_CNT; + - /** Memory mode. */ - private CacheMemoryMode memMode = DFLT_MEMORY_MODE; - /** */ private AffinityKeyMapper affMapper; @@@ -454,9 -472,10 +467,10 @@@ name = cc.getName(); nearCfg = cc.getNearConfiguration(); nodeFilter = cc.getNodeFilter(); + partitionLossPolicy = cc.getPartitionLossPolicy(); pluginCfgs = cc.getPluginConfigurations(); qryEntities = cc.getQueryEntities() == Collections.emptyList() ? null : cc.getQueryEntities(); + qryDetailMetricsSz = cc.getQueryDetailMetricsSize(); readFromBackup = cc.isReadFromBackup(); rebalanceBatchSize = cc.getRebalanceBatchSize(); rebalanceBatchesPrefetchCount = cc.getRebalanceBatchesPrefetchCount(); @@@ -1604,8 -1646,35 +1618,33 @@@ } /** + * Gets maximum number of query iterators that can be stored. Iterators are stored to + * support query pagination when each page of data is sent to user's node only on demand. + * Increase this property if you are running and processing lots of queries in parallel. + *

+ * Default value is {@link #DFLT_MAX_QUERY_ITERATOR_CNT}. + * + * @return Maximum number of query iterators that can be stored. + */ + public int getMaxQueryIteratorsCount() { + return maxQryIterCnt; + } + + /** + * Sets maximum number of query iterators that can be stored. + * + * @param maxQryIterCnt Maximum number of query iterators that can be stored. + * @return {@code this} for chaining. + */ + public CacheConfiguration setMaxQueryIteratorsCount(int maxQryIterCnt) { + this.maxQryIterCnt = maxQryIterCnt; + + return this; + } + + /** * Gets memory mode for cache. Memory mode helps control whether value is stored in on-heap memory, * off-heap memory, or swap space. Refer to {@link CacheMemoryMode} for more info. - *

- * Default value is {@link #DFLT_MEMORY_MODE}. * * @return Memory mode. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 3f83972,927944f..cee71bb --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@@ -48,8 -50,10 +49,9 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.odbc.OdbcProcessor; -import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; + import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.port.GridPortProcessor; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.resource.GridResourceProcessor; http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 42084e6,a2ad1b2..6a4547d --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@@ -65,8 -67,10 +66,9 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor; import org.apache.ignite.internal.processors.odbc.OdbcProcessor; -import org.apache.ignite.internal.processors.offheap.GridOffHeapProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; + import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.port.GridPortProcessor; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.processors.resource.GridResourceProcessor; http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index cbd7dd0,24ddcd2..7c1c484 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@@ -97,10 -100,7 +100,10 @@@ public enum GridTopic TOPIC_TX, /** */ - TOPIC_BACKUP, - TOPIC_IO_TEST; ++ TOPIC_IO_TEST, + + /** */ - TOPIC_IO_TEST; ++ TOPIC_BACKUP; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 29691e6,4972d1f..0d99f4c --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@@ -813,8 -835,15 +828,12 @@@ public class IgniteKernal implements Ig addHelper(IGFS_HELPER.create(F.isEmpty(cfg.getFileSystemConfiguration()))); + addHelper(HADOOP_HELPER.createIfInClassPath(ctx, false)); + startProcessor(new IgnitePluginProcessor(ctx, cfg, plugins)); + startProcessor(new PoolProcessor(ctx)); + - // Off-heap processor has no dependencies. - startProcessor(new GridOffHeapProcessor(ctx)); - // Closure processor should be started before all others // (except for resource processor), as many components can depend on it. startProcessor(new GridClosureProcessor(ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java index f8a20c3,f32a753..da24e40 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java @@@ -93,7 -94,10 +95,8 @@@ import org.apache.ignite.spi.discovery. import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; import org.apache.ignite.spi.failover.always.AlwaysFailoverSpi; import org.apache.ignite.spi.indexing.noop.NoopIndexingSpi; + import org.apache.ignite.spi.loadbalancing.LoadBalancingSpi; import org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi; -import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi; -import org.apache.ignite.spi.swapspace.noop.NoopSwapSpaceSpi; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.thread.IgniteThreadPoolExecutor; @@@ -1627,8 -1641,11 +1640,10 @@@ public class IgnitionEx ensureMultiInstanceSupport(myCfg.getCollisionSpi()); ensureMultiInstanceSupport(myCfg.getFailoverSpi()); ensureMultiInstanceSupport(myCfg.getLoadBalancingSpi()); - ensureMultiInstanceSupport(myCfg.getSwapSpaceSpi()); } + validateThreadPoolSize(cfg.getPublicThreadPoolSize(), "public"); + execSvc = new IgniteThreadPoolExecutor( "pub", cfg.getGridName(), http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java index 8624c64,69de3f2..7f3f5e4 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java @@@ -250,29 -248,7 +256,29 @@@ public class BinaryEnumObjectImpl imple /** {@inheritDoc} */ @Override public byte[] valueBytes(CacheObjectContext cacheCtx) throws IgniteCheckedException { - return U.marshal(ctx.marshaller(), this); + if (valBytes != null) + return valBytes; + - valBytes = ctx.marshaller().marshal(this); ++ valBytes = U.marshal(ctx.marshaller(), this); + + return valBytes; + } + + /** {@inheritDoc} */ + @Override public boolean putValue(ByteBuffer buf) throws IgniteCheckedException { + assert valBytes != null : "Value bytes must be initialized before object is stored"; + + return putValue(buf, 0, objectPutSize(valBytes.length)); + } + + /** {@inheritDoc} */ + @Override public boolean putValue(final ByteBuffer buf, int off, int len) throws IgniteCheckedException { + return CacheObjectAdapter.putValue(cacheObjectType(), buf, off, len, valBytes, 0); + } + + /** {@inheritDoc} */ + @Override public int valueBytesLength(CacheObjectContext ctx) throws IgniteCheckedException { + return objectPutSize(valueBytes(ctx).length); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldImpl.java index 4b59904,59e79fb..477ae37 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryFieldImpl.java @@@ -60,20 -47,11 +60,19 @@@ public class BinaryFieldImpl implement * @param fieldName Field name. * @param fieldId Field ID. */ - public BinaryFieldImpl(int typeId, BinarySchemaRegistry schemas, String fieldName, int fieldId) { + public BinaryFieldImpl( + BinaryContext ctx, + int typeId, + BinarySchemaRegistry schemas, + String fieldName, + int fieldId + ) { + assert ctx != null; assert typeId != 0; assert schemas != null; - assert fieldName != null; assert fieldId != 0; + this.ctx = ctx; this.typeId = typeId; this.schemas = schemas; this.fieldName = fieldName; http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java index e6d8781,b80f573..33c7f08 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java @@@ -18,17 -18,20 +18,21 @@@ package org.apache.ignite.internal.binary; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.IdentityHashMap; + import java.util.Iterator; + import java.util.Map; import org.apache.ignite.IgniteException; + import org.apache.ignite.binary.BinaryArrayIdentityResolver; + import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; - import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl; - import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; - import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.binary.BinaryObjectException; + import org.apache.ignite.binary.BinaryIdentityResolver; import org.apache.ignite.binary.BinaryType; - import org.apache.ignite.binary.BinaryObject; + import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl; + import org.apache.ignite.internal.util.typedef.internal.SB; + import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; /** @@@ -75,23 -78,35 +79,44 @@@ public abstract class BinaryObjectExImp } /** + * Get offset of data begin. + * + * @return Field value. + */ + public abstract int dataStartOffset(); + + /** + * Get offset of the footer begin. + * + * @return Field value. + */ + public abstract int footerStartOffset(); + + /** * Get field by offset. * - * @param fieldOffset Field offset. + * @param order Field offset. * @return Field value. */ - @Nullable protected abstract F fieldByOrder(int fieldOffset); + @Nullable public abstract F fieldByOrder(int order); + + /** + * Create field comparer. + * + * @return Comparer. + */ + public abstract BinarySerializedFieldComparator createFieldComparator(); /** + * Writes field value defined by the given field offset to the given byte buffer. + * + * @param fieldOffset Field offset. + * @return Boolean flag indicating whether the field was successfully written to the buffer, {@code false} + * if there is no enough space for the field in the buffer. + */ + protected abstract boolean writeFieldByOrder(int fieldOffset, ByteBuffer buf); + + /** * @param ctx Reader context. * @param fieldName Field name. * @return Field value. http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index bb7bf69,7ef7bc0..e0ff2a6 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@@ -967,8 -804,17 +804,27 @@@ public class GridIoManager extends Grid } } + if (ctx.config().getStripedPoolSize() > 0 && + plc == GridIoPolicy.SYSTEM_POOL && + msg.partition() != Integer.MIN_VALUE + ) { + ctx.getStripedExecutorService().execute(msg.partition(), c); + + return; + } + ++ if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) { ++ IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message(); ++ ++ if (msg0.processFromNioThread()) { ++ c.run(); ++ ++ return; ++ } ++ } ++ try { - pool(plc).execute(c); + pools.poolForPolicy(plc).execute(c); } catch (RejectedExecutionException e) { U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " + http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index f5c46a8,b1fe910..63d84c4 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@@ -161,11 -171,56 +172,61 @@@ public class GridIoMessageFactory imple Message msg = null; switch (type) { + case -44: + msg = new TcpCommunicationSpi.HandshakeMessage2(); + + break; + + case -43: + msg = new IgniteIoTestMessage(); + + break; + + case -42: + msg = new HadoopDirectShuffleMessage(); + + break; + + case -41: + msg = new HadoopShuffleFinishResponse(); + + break; + + case -40: + msg = new HadoopShuffleFinishRequest(); + + break; + + case -39: + msg = new HadoopJobId(); + + break; + + case -38: + msg = new HadoopShuffleAck(); + + break; + + case -37: + msg = new HadoopShuffleMessage(); + + break; + + case -36: + msg = new GridDhtAtomicSingleUpdateRequest(); + + break; + + case -27: + msg = new GridDhtTxOnePhaseCommitAckRequest(); + + break; + + case -27: + msg = new BackupFinishedMessage(); + + break; + case -26: msg = new TxLockList(); @@@ -757,17 -812,21 +818,33 @@@ break; case 125: + msg = new GridNearAtomicSingleUpdateRequest(); + + break; + + case 126: + msg = new GridNearAtomicSingleUpdateInvokeRequest(); + + break; + + case 127: + msg = new GridNearAtomicSingleUpdateFilterRequest(); + + break; + + // [-3..119] [124..127] [-36..-44]- this ++ case 125: + msg = new TcpCommunicationSpi.HandshakeMessage2(); + + break; + + // [-3..119] [124-125] - this + case 126: + msg = new IgniteIoTestMessage(); + + break; + + // [-3..119] [124-126] - this // [120..123] - DR // [-4..-22, -30..-35] - SQL default: http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java index 985cf99,77aaa09..969ea39 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java @@@ -53,27 -54,28 +53,28 @@@ public class IgniteIoTestMessage implem } /** - * @param id ID. - * @param req {@code True} for request. + * @param id Message ID. + * @param req Request flag. * @param payload Payload. */ - IgniteIoTestMessage(long id, boolean req, byte[] payload) { + public IgniteIoTestMessage(long id, boolean req, byte[] payload) { this.id = id; this.req = req; this.payload = payload; } /** - * @return Process from NIO thread flag. + * @return {@code True} if message should be processed from NIO thread + * (otherwise message is submitted to system pool). */ - boolean processFromNioThread() { + public boolean processFromNioThread() { return isFlag(FLAG_PROC_FROM_NIO); } /** - * @param procFromNioThread Process from NIO thread flag. + * @param procFromNioThread {@code True} if message should be processed from NIO thread. */ - void processFromNioThread(boolean procFromNioThread) { + public void processFromNioThread(boolean procFromNioThread) { setFlag(procFromNioThread, FLAG_PROC_FROM_NIO); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index c36262f,88aa4e0..4c62371 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@@ -1942,10 -1882,9 +1957,10 @@@ public abstract class GridCacheAdapter< @Nullable final UUID subjId, final String taskName, final boolean deserializeBinary, - @Nullable IgniteCacheExpiryPolicy expiry, + @Nullable final IgniteCacheExpiryPolicy expiry, final boolean skipVals, final boolean keepCacheObjects, + final boolean recovery, boolean canRemap, final boolean needVer ) { http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index db10484,52b779d..5f45024 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@@ -34,15 -35,15 +35,15 @@@ import org.apache.ignite.IgniteCheckedE import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; -import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.eviction.EvictableEntry; -import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl; -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; -import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.pagemem.wal.StorageException; +import org.apache.ignite.internal.pagemem.wal.record.DataEntry; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; - import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture; + import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras; @@@ -891,7 -1196,14 +894,7 @@@ public abstract class GridCacheMapEntr assert newVer != null : "Failed to get write version for tx: " + tx; - old = this.val; - boolean internal = isInternal() || !context().userCache(); - - Map lsnrCol = - notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null; - - old = oldValPresent ? oldVal : - (retval || intercept || lsnrCol != null) ? - rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : this.val; ++ old = oldValPresent ? oldVal : this.val; if (intercept) { val0 = cctx.unwrapBinaryIfNeeded(val, keepBinary, false); @@@ -1083,10 -1409,8 +1088,10 @@@ Map lsnrCol = notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null; - old = oldValPresent ? oldVal : (retval || intercept || lsnrCol != null) ? - rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val; + if (startVer && (retval || intercept || lsnrCol != null)) + unswap(); + - old = val; ++ old = oldValPresent ? oldVal : val; if (intercept) { entry0 = new CacheLazyEntry(cctx, key, old, keepBinary); @@@ -3122,35 -3559,47 +3125,35 @@@ if (curVer == null || curVer.equals(ver)) { if (val != this.val) { -- GridCacheMvcc mvcc = mvccExtras(); ++ GridCacheMvcc mvcc = mvccExtras(); -- if (mvcc != null && !mvcc.isEmpty()) -- return null; ++ if (mvcc != null && !mvcc.isEmpty()) ++ return null; -- if (newVer == null) -- newVer = cctx.versions().next(); ++ if (newVer == null) ++ newVer = cctx.versions().next(); - long ttl = ttlExtras(); - CacheObject old = rawGetOrUnmarshalUnlocked(false); ++ long ttl = ttlExtras(); - long expTime = CU.toExpireTime(ttl); - long ttl; - long expTime; ++ long expTime = CU.toExpireTime(ttl); - // Detach value before index update. - val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); - if (loadExpiryPlc != null) { - IgniteBiTuple initTtlAndExpireTime = initialTtlAndExpireTime(loadExpiryPlc); ++ // Detach value before index update. ++ val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); - if (val != null) { - storeValue(val, expTime, newVer); - ttl = initTtlAndExpireTime.get1(); - expTime = initTtlAndExpireTime.get2(); - } - else { - ttl = ttlExtras(); - expTime = expireTimeExtras(); - } ++ if (val != null) { ++ storeValue(val, expTime, newVer); - if (deletedUnlocked()) - deletedUnlocked(false); - } - // Detach value before index update. - val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); ++ if (deletedUnlocked()) ++ deletedUnlocked(false); ++ } - // Version does not change for load ops. - update(val, expTime, ttl, newVer, true); - if (val != null) { - updateIndex(val, expTime, newVer, old); ++ // Version does not change for load ops. ++ update(val, expTime, ttl, newVer, true); - return newVer; - } - } - if (deletedUnlocked()) - deletedUnlocked(false); ++ return newVer; ++ } + } - // Version does not change for load ops. - update(val, expTime, ttl, newVer, true); - - return newVer; - } - } - return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index d03a90a,8ea2169..40035c0 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -54,16 -56,18 +56,18 @@@ import org.apache.ignite.internal.Ignit import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.events.DiscoveryCustomEvent; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.pagemem.backup.StartFullBackupAckDiscoveryMessage; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; + import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; + import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; @@@ -751,9 -750,17 +769,15 @@@ public class GridCachePartitionExchange if (log.isDebugEnabled()) log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']'); - Collection rmts; - // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { + GridDhtPartitionsExchangeFuture lastFut = lastInitializedFut; + + // No need to send to nodes which did not finish their first exchange. + AffinityTopologyVersion rmtTopVer = + lastFut != null ? lastFut.topologyVersion() : AffinityTopologyVersion.NONE; + - rmts = CU.remoteNodes(cctx, rmtTopVer); + Collection rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE); if (log.isDebugEnabled()) log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); @@@ -813,24 -935,11 +964,25 @@@ * @param id ID. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { - GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, + GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node, + id, cctx.kernalContext().clientNode(), - cctx.versions().last()); + false); + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { + if (!cacheCtx.isLocal()) { + GridDhtPartitionMap2 locMap = cacheCtx.topology().localPartitionMap(); + + m.addLocalPartitionMap(cacheCtx.cacheId(), locMap); + } + } + + for (GridClientPartitionTopology top : clientTops.values()) { + GridDhtPartitionMap2 locMap = top.localPartitionMap(); + + m.addLocalPartitionMap(top.cacheId(), locMap); + } + if (log.isDebugEnabled()) log.debug("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); @@@ -1395,17 -1594,13 +1644,13 @@@ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { long timeout = cctx.gridConfig().getNetworkTimeout(); - boolean startEvtFired = false; - int cnt = 0; - IgniteInternalFuture asyncStartFut = null; - while (!isCancelled()) { - GridDhtPartitionsExchangeFuture exchFut = null; - cnt++; + GridDhtPartitionsExchangeFuture exchFut = null; + try { boolean preloadFinished = true; @@@ -1580,27 -1762,26 +1813,34 @@@ for (Integer cacheId : orderMap.get(order)) { GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId); + + if (assigns != null) + assignsCancelled |= assigns.cancelled(); + + List waitList = new ArrayList<>(size - 1); + + for (List cIds : orderMap.headMap(order).values()) { + for (Integer cId : cIds) + waitList.add(cctx.cacheContext(cId).name()); + } + - Callable r = cacheCtx.preloader().addAssignments(assignsMap.get(cacheId), + // Cancels previous rebalance future (in case it's not done yet). + // Sends previous rebalance stopped event (if necessary). + // Creates new rebalance future. + // Sends current rebalance started event (if necessary). + // Finishes cache sync future (on empty assignments). + Runnable cur = cacheCtx.preloader().addAssignments(assigns, forcePreload, + waitList, cnt, + r, exchFut.forcedRebalanceFuture()); - if (r != null) { - U.log(log, "Cache rebalancing scheduled: [cache=" + cacheCtx.name() + - ", waitList=" + waitList.toString() + "]"); + if (cur != null) { + rebList.add(U.maskName(cacheCtx.name())); - if (cacheId == CU.cacheId(GridCacheUtils.MARSH_CACHE_NAME)) - marshR = r; - else - orderedRs.add(r); + r = cur; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 1ea2e12,0c28691..694e20f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@@ -84,14 -84,14 +84,15 @@@ public interface GridCachePreloader * * @param assignments Assignments to add. * @param forcePreload Force preload flag. - * @param caches Rebalancing of these caches will be finished before this started. * @param cnt Counter. - * @return Rebalancing closure. + * @param next Runnable responsible for cache rebalancing start. + * @return Rebalancing runnable. */ - public Callable addAssignments(GridDhtPreloaderAssignments assignments, + public Runnable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, + Collection caches, int cnt, + Runnable next, @Nullable GridFutureAdapter forcedRebFut); /** http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 84e7727,d7ec288..d3eeed9 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@@ -160,8 -166,11 +166,12 @@@ public class GridCachePreloaderAdapter } /** {@inheritDoc} */ - @Override public Callable addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload, - Collection caches, int cnt, @Nullable GridFutureAdapter forcedRebFut) { + @Override public Runnable addAssignments(GridDhtPreloaderAssignments assignments, + boolean forcePreload, ++ Collection caches, + int cnt, + Runnable next, + @Nullable GridFutureAdapter forcedRebFut) { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 179f5f8,0be2072..881d257 mode 100644,100755..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@@ -1913,20 -1867,9 +1915,21 @@@ public class GridCacheProcessor extend GridCacheVersionManager verMgr = new GridCacheVersionManager(); GridCacheDeploymentManager depMgr = new GridCacheDeploymentManager(); GridCachePartitionExchangeManager exchMgr = new GridCachePartitionExchangeManager(); + + IgniteCacheDatabaseSharedManager dbMgr = ctx.plugins().createComponent(IgniteCacheDatabaseSharedManager.class); + + if (dbMgr == null) + dbMgr = new IgniteCacheDatabaseSharedManager(); + + IgnitePageStoreManager pageStoreMgr = ctx.plugins().createComponent(IgnitePageStoreManager.class); + IgniteWriteAheadLogManager walMgr = ctx.plugins().createComponent(IgniteWriteAheadLogManager.class); + + if (walMgr == null) + walMgr = new IgniteWriteAheadLogNoopManager(); + GridCacheIoManager ioMgr = new GridCacheIoManager(); CacheAffinitySharedManager topMgr = new CacheAffinitySharedManager(); + GridCacheSharedTtlCleanupManager ttl = new GridCacheSharedTtlCleanupManager(); CacheJtaManagerAdapter jta = JTA.createOptional(); @@@ -2477,44 -2437,8 +2512,45 @@@ } /** + * Resets cache state after the cache has been moved to recovery state. + * + * @param cacheName Cache name. + * @return Future that will be completed when state is changed. + */ + public IgniteInternalFuture resetCacheState(String cacheName) { + IgniteCacheProxy proxy = jCacheProxies.get(maskNull(cacheName)); + + if (proxy == null || proxy.proxyClosed()) + return new GridFinishedFuture<>(); // No-op. + + checkEmptyTransactions(); + + DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, ctx.localNodeId()); + + t.markResetLostPartitions(); + + return F.first(initiateCacheChanges(F.asList(t), false)); + } + + /** + * Changes global cluster state. + * + * @param state Cache state. + * @return Future that will be completed when state is changed. + */ + public IgniteInternalFuture changeGlobalState(CacheState state) { + checkEmptyTransactions(); + + DynamicCacheChangeRequest t = new DynamicCacheChangeRequest(UUID.randomUUID(), null, ctx.localNodeId()); + + t.state(state); + + return F.first(initiateCacheChanges(F.asList(t), false)); + } + + /** * @param reqs Requests. + * @param failIfExists Fail if exists flag. * @return Collection of futures. */ @SuppressWarnings("TypeMayBeWeakened") http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index f01b6fd,117a5c3..384750f --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@@ -173,7 -157,7 +178,7 @@@ public class GridCacheSharedContext ext if (grp != null) qry.projection(grp); - fut = ctx.kernalContext().query().executeQuery(ctx, + fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.TEXT, p.getText(), ctx, new IgniteOutClosureX>>() { - @Override public CacheQueryFuture> applyx() throws IgniteCheckedException { + @Override public CacheQueryFuture> applyx() { return qry.execute(); } }, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index ac002fc,1d60c42..17df059 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@@ -85,7 -88,9 +86,8 @@@ import org.apache.ignite.internal.util. import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.lang.GridMapEntry; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; + import org.apache.ignite.internal.util.typedef.T1; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 690bfeb,4adfa8b..a8d3f2d --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@@ -477,147 -484,77 +497,151 @@@ public class GridDistributedTxRemoteAda boolean replicate = cacheCtx.isDrEnabled(); - while (true) { - try { - GridCacheEntryEx cached = txEntry.cached(); + try { + while (true) { + try { + GridCacheEntryEx cached = txEntry.cached(); - if (cached == null) - txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); + if (cached == null) + txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); - if (near() && cacheCtx.dr().receiveEnabled()) { - cached.markObsolete(xidVer); + if (near() && cacheCtx.dr().receiveEnabled()) { + cached.markObsolete(xidVer); - break; - } + break; + } - GridNearCacheEntry nearCached = null; + GridNearCacheEntry nearCached = null; - if (updateNearCache(cacheCtx, txEntry.key(), topVer)) - nearCached = cacheCtx.dht().near().peekExx(txEntry.key()); + if (updateNearCache(cacheCtx, txEntry.key(), topVer)) + nearCached = cacheCtx.dht().near().peekExx(txEntry.key()); - if (!F.isEmpty(txEntry.entryProcessors())) - txEntry.cached().unswap(false); + if (!F.isEmpty(txEntry.entryProcessors())) + txEntry.cached().unswap(false); - IgniteBiTuple res = - applyTransformClosures(txEntry, false, ret); + IgniteBiTuple res = - applyTransformClosures(txEntry, false); ++ applyTransformClosures(txEntry, false, ret); - GridCacheOperation op = res.get1(); - CacheObject val = res.get2(); + GridCacheOperation op = res.get1(); + CacheObject val = res.get2(); - GridCacheVersion explicitVer = txEntry.conflictVersion(); + GridCacheVersion explicitVer = txEntry.conflictVersion(); - if (explicitVer == null) - explicitVer = writeVersion(); + if (explicitVer == null) + explicitVer = writeVersion(); - if (txEntry.ttl() == CU.TTL_ZERO) - op = DELETE; + if (txEntry.ttl() == CU.TTL_ZERO) + op = DELETE; - boolean conflictNeedResolve = cacheCtx.conflictNeedResolve(); + boolean conflictNeedResolve = cacheCtx.conflictNeedResolve(); - GridCacheVersionConflictContext conflictCtx = null; + GridCacheVersionConflictContext conflictCtx = null; - if (conflictNeedResolve) { - IgniteBiTuple - drRes = conflictResolve(op, txEntry, val, explicitVer, cached); + if (conflictNeedResolve) { + IgniteBiTuple + drRes = conflictResolve(op, txEntry, val, explicitVer, cached); - assert drRes != null; + assert drRes != null; - conflictCtx = drRes.get2(); + conflictCtx = drRes.get2(); - if (conflictCtx.isUseOld()) - op = NOOP; - else if (conflictCtx.isUseNew()) { - txEntry.ttl(conflictCtx.ttl()); - txEntry.conflictExpireTime(conflictCtx.expireTime()); - } - else if (conflictCtx.isMerge()) { - op = drRes.get1(); - val = txEntry.context().toCacheObject(conflictCtx.mergeValue()); - explicitVer = writeVersion(); + if (conflictCtx.isUseOld()) + op = NOOP; + else if (conflictCtx.isUseNew()) { + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); + } + else if (conflictCtx.isMerge()) { + op = drRes.get1(); + val = txEntry.context().toCacheObject(conflictCtx.mergeValue()); + explicitVer = writeVersion(); - txEntry.ttl(conflictCtx.ttl()); - txEntry.conflictExpireTime(conflictCtx.expireTime()); + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); + } + } + else + // Nullify explicit version so that innerSet/innerRemove will work as usual. + explicitVer = null; + + GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null; + + if (!near() && cctx.wal() != null && op != NOOP && op != RELOAD && op != READ) { + if (dataEntries == null) + dataEntries = new ArrayList<>(entries.size()); + + dataEntries.add( + new DataEntry( + cacheCtx.cacheId(), + txEntry.key(), + val, + op, + nearXidVersion(), + writeVersion(), + 0, + txEntry.key().partition(), + txEntry.updateCounter() + ) + ); } - } - else - // Nullify explicit version so that innerSet/innerRemove will work as usual. - explicitVer = null; - - GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null; - if (op == CREATE || op == UPDATE) { - // Invalidate only for near nodes (backups cannot be invalidated). - if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear())) + if (op == CREATE || op == UPDATE) { + // Invalidate only for near nodes (backups cannot be invalidated). + if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear())) + cached.innerRemove(this, + eventNodeId(), + nodeId, + false, + true, + true, + txEntry.keepBinary(), ++ txEntry.hasOldValue(), ++ txEntry.oldValue(), + topVer, + null, + replicate ? DR_BACKUP : DR_NONE, + near() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer, + txEntry.updateCounter()); + else { + cached.innerSet(this, + eventNodeId(), + nodeId, + val, + false, + false, + txEntry.ttl(), + true, + true, + txEntry.keepBinary(), ++ txEntry.hasOldValue(), ++ txEntry.oldValue(), + topVer, + null, + replicate ? DR_BACKUP : DR_NONE, + txEntry.conflictExpireTime(), + near() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer, + txEntry.updateCounter()); + + // Keep near entry up to date. + if (nearCached != null) { + CacheObject val0 = cached.valueBytes(); + + nearCached.updateOrEvict(xidVer, + val0, + cached.expireTime(), + cached.ttl(), + nodeId, + topVer); + } + } + } + else if (op == DELETE) { cached.innerRemove(this, eventNodeId(), nodeId, @@@ -652,82 -607,111 +678,86 @@@ topVer); } } - } - else if (op == DELETE) { - cached.innerRemove(this, - eventNodeId(), - nodeId, - false, - true, - true, - txEntry.keepBinary(), - txEntry.hasOldValue(), - txEntry.oldValue(), - topVer, - null, - replicate ? DR_BACKUP : DR_NONE, - near() ? null : explicitVer, - CU.subjectId(this, cctx), - resolveTaskName(), - dhtVer, - txEntry.updateCounter()); - - // Keep near entry up to date. - if (nearCached != null) - nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId, topVer); - } - else if (op == RELOAD) { - CacheObject reloaded = cached.innerReload(); + else if (op == READ) { + assert near(); - if (nearCached != null) { - nearCached.innerReload(); - - nearCached.updateOrEvict(cached.version(), - reloaded, - cached.expireTime(), - cached.ttl(), - nodeId, - topVer); + if (log.isDebugEnabled()) + log.debug("Ignoring READ entry when committing: " + txEntry); } - } - else if (op == READ) { - assert near(); - - if (log.isDebugEnabled()) - log.debug("Ignoring READ entry when committing: " + txEntry); - } - // No-op. - else { - if (conflictCtx == null || !conflictCtx.isUseOld()) { - if (txEntry.ttl() != CU.TTL_NOT_CHANGED) - cached.updateTtl(null, txEntry.ttl()); - - if (nearCached != null) { - CacheObject val0 = cached.valueBytes(); - - nearCached.updateOrEvict(xidVer, - val0, - cached.expireTime(), - cached.ttl(), - nodeId, - topVer); + // No-op. + else { + if (conflictCtx == null || !conflictCtx.isUseOld()) { + if (txEntry.ttl() != CU.TTL_NOT_CHANGED) + cached.updateTtl(null, txEntry.ttl()); + + if (nearCached != null) { + CacheObject val0 = cached.valueBytes(); + + nearCached.updateOrEvict(xidVer, + val0, + cached.expireTime(), + cached.ttl(), + nodeId, + topVer); + } } } - } - // Assert after setting values as we want to make sure - // that if we replaced removed entries. - assert - txEntry.op() == READ || onePhaseCommit() || - // If candidate is not there, then lock was explicit - // and we simply allow the commit to proceed. - !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) : - "Transaction does not own lock for commit [entry=" + cached + - ", tx=" + this + ']'; - - // Break out of while loop. - break; - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Attempting to commit a removed entry (will retry): " + txEntry); + // Assert after setting values as we want to make sure + // that if we replaced removed entries. + assert + txEntry.op() == READ || onePhaseCommit() || + // If candidate is not there, then lock was explicit + // and we simply allow the commit to proceed. + !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) : + "Transaction does not own lock for commit [entry=" + cached + + ", tx=" + this + ']'; + + // Break out of while loop. + break; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Attempting to commit a removed entry (will retry): " + txEntry); - // Renew cached entry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); + // Renew cached entry. + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); + } } } - } - } - catch (Throwable ex) { - // In case of error, we still make the best effort to commit, - // as there is no way to rollback at this point. - err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " + - "(all transaction entries will be invalidated): " + CU.txString(this), ex); + catch (Throwable ex) { + // In case of error, we still make the best effort to commit, + // as there is no way to rollback at this point. + err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " + + "(all transaction entries will be invalidated): " + CU.txString(this), ex); + + U.error(log, "Commit failed.", err); - U.error(log, "Commit failed.", err); + uncommit(); - uncommit(); + state(UNKNOWN); - state(UNKNOWN); + if (ex instanceof Error) + throw (Error)ex; + } ++ finally { ++ if (wrapper != null) ++ wrapper.initialize(ret); ++ } + } - if (ex instanceof Error) - throw (Error)ex; + if (!near() && cctx.wal() != null) + cctx.wal().log(new DataRecord(dataEntries)); + if (ptr != null) + cctx.wal().fsync(ptr); + } + catch (StorageException e) { + throw new IgniteCheckedException("Failed to log transaction record " + + "(transaction will be rolled back): " + this, e); } finally { - if (wrapper != null) - wrapper.initialize(ret); + cctx.database().checkpointReadUnlock(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index a6f41ac,816132d..88cd16b --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@@ -97,8 -98,11 +100,11 @@@ public class GridClientPartitionTopolog private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); /** Partition update counters. */ - private Map cntrMap = new HashMap<>(); + private Map> cntrMap = new HashMap<>(); + /** */ + private final Object similarAffKey; + /** * @param cctx Context. * @param cacheId Cache ID. @@@ -127,9 -134,16 +136,16 @@@ } /** + * @return Key to find caches with similar affinity. + */ + @Nullable public Object similarAffinityKey() { + return similarAffKey; + } + + /** * @return Full map string representation. */ - @SuppressWarnings( {"ConstantConditions"}) + @SuppressWarnings({"ConstantConditions"}) private String fullMapString() { return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString(); } @@@ -533,9 -550,9 +550,9 @@@ /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap, - Map cntrMap) { + Map> cntrMap) { if (log.isDebugEnabled()) log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']'); @@@ -626,10 -643,10 +643,10 @@@ } /** {@inheritDoc} */ - @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) - @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId, + @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionMap2 parts, - Map> cntrMap) { - Map cntrMap, ++ Map> cntrMap, + boolean checkEvictions) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); @@@ -674,44 -692,46 +691,46 @@@ long updateSeq = this.updateSeq.incrementAndGet(); - node2part = new GridDhtPartitionFullMap(node2part, updateSeq); - - boolean changed = false; + node2part.updateSequence(updateSeq); - if (cur == null || !cur.equals(parts)) - changed = true; + boolean changed = cur == null || !cur.equals(parts); - node2part.put(parts.nodeId(), parts); + if (changed) { + node2part.put(parts.nodeId(), parts); - part2node = new HashMap<>(part2node); + // Add new mappings. + for (Integer p : parts.keySet()) { + Set ids = part2node.get(p); - // Add new mappings. - for (Integer p : parts.keySet()) { - Set ids = part2node.get(p); + if (ids == null) + // Initialize HashSet to size 3 in anticipation that there won't be + // more than 3 nodes per partition. + part2node.put(p, ids = U.newHashSet(3)); - if (ids == null) - // Initialize HashSet to size 3 in anticipation that there won't be - // more than 3 nodes per partition. - part2node.put(p, ids = U.newHashSet(3)); + ids.add(parts.nodeId()); + } - changed |= ids.add(parts.nodeId()); - } + // Remove obsolete mappings. + if (cur != null) { + for (Integer p : cur.keySet()) { + if (parts.containsKey(p)) + continue; - // Remove obsolete mappings. - if (cur != null) { - for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { - Set ids = part2node.get(p); + Set ids = part2node.get(p); - if (ids != null) - changed |= ids.remove(parts.nodeId()); + if (ids != null) + ids.remove(parts.nodeId()); + } } } + else + cur.updateSequence(parts.updateSequence(), parts.topologyVersion()); if (cntrMap != null) { - for (Map.Entry e : cntrMap.entrySet()) { - Long cntr = this.cntrMap.get(e.getKey()); + for (Map.Entry> e : cntrMap.entrySet()) { + T2 cntr = this.cntrMap.get(e.getKey()); - if (cntr == null || cntr < e.getValue()) + if (cntr == null || cntr.get2() < e.getValue().get2()) this.cntrMap.put(e.getKey(), e.getValue()); } } @@@ -729,24 -749,10 +748,29 @@@ } /** {@inheritDoc} */ + @Override public boolean detectLostPartitions(DiscoveryEvent discoEvt) { + assert false : "detectLostPartitions should never be called on client topology"; + + return false; + } + + /** {@inheritDoc} */ + @Override public void resetLostPartitions() { + assert false : "resetLostPartitions should never be called on client topology"; + } + + /** {@inheritDoc} */ + @Override public Collection lostPartitions() { + assert false : "lostPartitions should never be called on client topology"; + + return Collections.emptyList(); + } + ++ /** {@inheritDoc} */ + @Override public void checkEvictions() { + // No-op. + } + /** * Updates value for single partition. * @@@ -877,48 -882,22 +900,36 @@@ } /** {@inheritDoc} */ - @Nullable @Override public GridDhtPartitionMap2 partitions(UUID nodeId) { - @Override public Map updateCounters(boolean skipZeros) { -- lock.readLock().lock(); - - try { - return node2part.get(nodeId); - } - finally { - lock.readLock().unlock(); - } - } - - /** {@inheritDoc} */ + @Override public void setOwners(int p, Set owners, boolean updateSeq) { + lock.writeLock().lock(); try { - if (skipZeros) { - Map res = U.newHashMap(cntrMap.size()); - - for (Map.Entry e : cntrMap.entrySet()) { - if (!e.getValue().equals(ZERO)) - res.put(e.getKey(), e.getValue()); - } + for (Map.Entry e : node2part.entrySet()) { + if (!e.getValue().containsKey(p)) + continue; - return res; + if (e.getValue().get(p) == OWNING && !owners.contains(e.getKey())) + e.getValue().put(p, MOVING); + else if (owners.contains(e.getKey())) + e.getValue().put(p, OWNING); } - else - return new HashMap<>(cntrMap); + + part2node.put(p, owners); + + if (updateSeq) + this.updateSeq.incrementAndGet(); + } + finally { + lock.writeLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public Map> updateCounters() { + lock.readLock().lock(); + + try { + return new HashMap<>(cntrMap); } finally { lock.readLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8483e8fa/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ----------------------------------------------------------------------