Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 45C96200B4C for ; Fri, 22 Jul 2016 13:29:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 449D8160A77; Fri, 22 Jul 2016 11:29:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 10276160A5A for ; Fri, 22 Jul 2016 13:29:16 +0200 (CEST) Received: (qmail 42438 invoked by uid 500); 22 Jul 2016 11:29:16 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 41767 invoked by uid 99); 22 Jul 2016 11:29:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 22 Jul 2016 11:29:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D52CCE0844; Fri, 22 Jul 2016 11:29:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 22 Jul 2016 11:29:40 -0000 Message-Id: <92fca1bdefb142b393998e88c7345491@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [26/33] ignite git commit: GG-11293: .NET: Backported affinity functions feature to 7.5.30. archived-at: Fri, 22 Jul 2016 11:29:20 -0000 GG-11293: .NET: Backported affinity functions feature to 7.5.30. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5f57cc8d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5f57cc8d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5f57cc8d Branch: refs/heads/ignite-3547 Commit: 5f57cc8d703f9e8f749c9e3c403781365642dc3a Parents: 78d7c13 Author: vozerov-gridgain Authored: Tue Jul 19 14:34:35 2016 +0300 Committer: vozerov-gridgain Committed: Tue Jul 19 14:34:35 2016 +0300 ---------------------------------------------------------------------- .../internal/binary/GridBinaryMarshaller.java | 11 +- .../GridAffinityFunctionContextImpl.java | 9 + .../processors/cache/GridCacheProcessor.java | 65 +++-- .../affinity/PlatformAffinityFunction.java | 277 +++++++++++++++++++ .../PlatformAffinityFunctionTarget.java | 113 ++++++++ .../cache/affinity/PlatformAffinityUtils.java | 116 ++++++++ .../callback/PlatformCallbackGateway.java | 89 ++++++ .../callback/PlatformCallbackUtils.java | 49 ++++ .../PlatformDotNetConfigurationClosure.java | 115 +++++++- .../dotnet/PlatformDotNetAffinityFunction.java | 171 ++++++++++++ .../cpp/common/include/ignite/common/java.h | 18 ++ modules/platforms/cpp/common/src/java.cpp | 36 ++- .../Apache.Ignite.Core.Tests.csproj | 9 +- .../Affinity/AffinityFunctionSpringTest.cs | 184 ++++++++++++ .../Config/Cache/Affinity/affinity-function.xml | 129 +++++++++ .../Cache/Affinity/affinity-function2.xml | 49 ++++ .../Apache.Ignite.Core.Tests/TestRunner.cs | 3 +- .../Apache.Ignite.Core.csproj | 11 +- .../Cache/Affinity/AffinityFunctionBase.cs | 139 ++++++++++ .../Cache/Affinity/AffinityFunctionContext.cs | 120 ++++++++ .../Cache/Affinity/AffinityTopologyVersion.cs | 138 +++++++++ .../Cache/Affinity/Fair/FairAffinityFunction.cs | 32 +++ .../Cache/Affinity/IAffinityFunction.cs | 82 ++++++ .../Rendezvous/RendezvousAffinityFunction.cs | 31 +++ .../Apache.Ignite.Core/Events/EventReader.cs | 8 +- .../dotnet/Apache.Ignite.Core/Ignition.cs | 38 ++- .../Impl/Binary/BinaryReaderExtensions.cs | 14 + .../Impl/Binary/Marshaller.cs | 6 +- .../Affinity/AffinityFunctionSerializer.cs | 277 +++++++++++++++++++ .../Cache/Affinity/PlatformAffinityFunction.cs | 74 +++++ .../Impl/Common/ObjectInfoHolder.cs | 86 ++++++ .../Apache.Ignite.Core/Impl/IgniteUtils.cs | 10 +- .../Impl/Unmanaged/UnmanagedCallbackHandlers.cs | 6 + .../Impl/Unmanaged/UnmanagedCallbacks.cs | 133 ++++++++- 34 files changed, 2596 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java index 535207c..3a3dfd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/GridBinaryMarshaller.java @@ -305,6 +305,15 @@ public class GridBinaryMarshaller { /** * Push binary context and return the old one. * + * @return Old binary context. + */ + public BinaryContext pushContext() { + return pushContext(ctx); + } + + /** + * Push binary context and return the old one. + * * @param ctx Binary context. * @return Old binary context. */ @@ -321,7 +330,7 @@ public class GridBinaryMarshaller { * * @param oldCtx Old binary context. */ - private static void popContext(@Nullable BinaryContext oldCtx) { + public static void popContext(@Nullable BinaryContext oldCtx) { if (oldCtx == null) BINARY_CTX.remove(); else http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java index 6c97efd..4ddee00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java @@ -80,4 +80,13 @@ public class GridAffinityFunctionContextImpl implements AffinityFunctionContext @Override public int backups() { return backups; } + + /** + * Gets the previous assignment. + * + * @return Previous assignment. + */ + public List> prevAssignment() { + return prevAssignment; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 6484d4d..6761fac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -69,10 +69,13 @@ import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgniteTransactionsEx; +import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; @@ -94,12 +97,14 @@ import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.transactions.IgniteTransactionsImpl; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; +import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.plugin.CachePluginManager; import org.apache.ignite.internal.processors.query.GridQueryProcessor; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgniteOutClosureX; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; @@ -3379,32 +3384,60 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @throws IgniteCheckedException If validation failed. * @return Configuration copy. */ - private CacheConfiguration cloneCheckSerializable(CacheConfiguration val) throws IgniteCheckedException { + private CacheConfiguration cloneCheckSerializable(final CacheConfiguration val) throws IgniteCheckedException { if (val == null) return null; - if (val.getCacheStoreFactory() != null) { - try { - ClassLoader ldr = ctx.config().getClassLoader(); + return withBinaryContext(new IgniteOutClosureX() { + @Override public CacheConfiguration applyx() throws IgniteCheckedException { + if (val.getCacheStoreFactory() != null) { + try { + ClassLoader ldr = ctx.config().getClassLoader(); - if (ldr == null) - ldr = val.getCacheStoreFactory().getClass().getClassLoader(); + if (ldr == null) + ldr = val.getCacheStoreFactory().getClass().getClassLoader(); - marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()), - U.resolveClassLoader(ldr, ctx.config())); - } - catch (IgniteCheckedException e) { - throw new IgniteCheckedException("Failed to validate cache configuration. " + - "Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e); + marshaller.unmarshal(marshaller.marshal(val.getCacheStoreFactory()), + U.resolveClassLoader(ldr, ctx.config())); + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Failed to validate cache configuration. " + + "Cache store factory is not serializable. Cache name: " + U.maskName(val.getName()), e); + } + } + + try { + return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config())); + } + catch (IgniteCheckedException e) { + throw new IgniteCheckedException("Failed to validate cache configuration " + + "(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e); + } } + }); + } + + /** + * @param c Closure. + * @return Closure result. + * @throws IgniteCheckedException If failed. + */ + private T withBinaryContext(IgniteOutClosureX c) throws IgniteCheckedException { + IgniteCacheObjectProcessor objProc = ctx.cacheObjects(); + BinaryContext oldCtx = null; + + if (objProc instanceof CacheObjectBinaryProcessorImpl) { + GridBinaryMarshaller binMarsh = ((CacheObjectBinaryProcessorImpl)objProc).marshaller(); + + oldCtx = binMarsh == null ? null : binMarsh.pushContext(); } try { - return marshaller.unmarshal(marshaller.marshal(val), U.resolveClassLoader(ctx.config())); + return c.applyx(); } - catch (IgniteCheckedException e) { - throw new IgniteCheckedException("Failed to validate cache configuration " + - "(make sure all objects in cache configuration are serializable): " + U.maskName(val.getName()), e); + finally { + if (objProc instanceof CacheObjectBinaryProcessorImpl) + GridBinaryMarshaller.popContext(oldCtx); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java new file mode 100644 index 0000000..6681e7a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.resources.IgniteInstanceResource; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.List; +import java.util.UUID; + +/** + * Platform AffinityFunction. + */ +public class PlatformAffinityFunction implements AffinityFunction, Externalizable, LifecycleAware { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final byte FLAG_PARTITION = 1; + + /** */ + private static final byte FLAG_REMOVE_NODE = 1 << 1; + + /** */ + private static final byte FLAG_ASSIGN_PARTITIONS = 1 << 2; + + /** */ + private Object userFunc; + + /** + * Partition count. + * + * 1) Java calls partitions() method very early (before LifecycleAware.start) during CacheConfiguration validation. + * 2) Partition count never changes. + * Therefore, we get the value on .NET side once, and pass it along with PlatformAffinity. + */ + private int partitions; + + /** */ + private AffinityFunction baseFunc; + + /** */ + private byte overrideFlags; + + /** */ + private transient Ignite ignite; + + /** */ + private transient PlatformContext ctx; + + /** */ + private transient long ptr; + + /** */ + private transient PlatformAffinityFunctionTarget baseTarget; + + + /** + * Ctor for serialization. + * + */ + public PlatformAffinityFunction() { + partitions = -1; + } + + /** + * Ctor. + * + * @param func User fun object. + * @param partitions Number of partitions. + */ + public PlatformAffinityFunction(Object func, int partitions, byte overrideFlags, AffinityFunction baseFunc) { + userFunc = func; + this.partitions = partitions; + this.overrideFlags = overrideFlags; + this.baseFunc = baseFunc; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // userFunc is always in initial state (it is serialized only once on start). + if (baseFunc != null) + baseFunc.reset(); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + // Affinity function can not return different number of partitions, + // so we pass this value once from the platform. + assert partitions > 0; + + return partitions; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + if ((overrideFlags & FLAG_PARTITION) == 0) { + assert baseFunc != null; + + return baseFunc.partition(key); + } + + assert ctx != null; + assert ptr != 0; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeObject(key); + + out.synchronize(); + + return ctx.gateway().affinityFunctionPartition(ptr, mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public List> assignPartitions(AffinityFunctionContext affCtx) { + if ((overrideFlags & FLAG_ASSIGN_PARTITIONS) == 0) { + assert baseFunc != null; + + return baseFunc.assignPartitions(affCtx); + } + + assert ctx != null; + assert ptr != 0; + assert affCtx != null; + + try (PlatformMemory outMem = ctx.memory().allocate()) { + try (PlatformMemory inMem = ctx.memory().allocate()) { + PlatformOutputStream out = outMem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + // Write previous assignment + PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx); + + out.synchronize(); + + // Call platform + // We can not restore original AffinityFunctionContext after the call to platform, + // due to DiscoveryEvent (when node leaves, we can't get it by id anymore). + // Secondly, AffinityFunctionContext can't be changed by the user. + if (baseTarget != null) + baseTarget.setCurrentAffinityFunctionContext(affCtx); + + try { + ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer()); + } + finally { + if (baseTarget != null) + baseTarget.setCurrentAffinityFunctionContext(null); + } + + // Read result + return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(inMem), ctx); + } + } + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + if ((overrideFlags & FLAG_REMOVE_NODE) == 0) { + assert baseFunc != null; + + baseFunc.removeNode(nodeId); + + return; + } + + assert ctx != null; + assert ptr != 0; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeUuid(nodeId); + + out.synchronize(); + + ctx.gateway().affinityFunctionRemoveNode(ptr, mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(userFunc); + out.writeInt(partitions); + out.writeByte(overrideFlags); + out.writeObject(baseFunc); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + userFunc = in.readObject(); + partitions = in.readInt(); + overrideFlags = in.readByte(); + baseFunc = (AffinityFunction)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + // userFunc is null when there is nothing overridden + if (userFunc == null) + return; + + assert ignite != null; + ctx = PlatformUtils.platformContext(ignite); + assert ctx != null; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeObject(userFunc); + + out.synchronize(); + + baseTarget = baseFunc != null + ? new PlatformAffinityFunctionTarget(ctx, baseFunc) + : null; + + ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTarget); + } + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (ptr == 0) + return; + + assert ctx != null; + + ctx.gateway().affinityFunctionDestroy(ptr); + } + + /** + * Injects the Ignite. + * + * @param ignite Ignite. + */ + @SuppressWarnings("unused") + @IgniteInstanceResource + public void setIgnite(Ignite ignite) { + this.ignite = ignite; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java new file mode 100644 index 0000000..8a07b33 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.util.List; + +/** + * Platform affinity function target: + * to be invoked when Platform function calls base implementation of one of the AffinityFunction methods. + */ +public class PlatformAffinityFunctionTarget extends PlatformAbstractTarget { + /** */ + private static final int OP_PARTITION = 1; + + /** */ + private static final int OP_REMOVE_NODE = 2; + + /** */ + private static final int OP_ASSIGN_PARTITIONS = 3; + + /** Inner function to delegate calls to. */ + private final AffinityFunction baseFunc; + + /** Thread local to hold the current affinity function context. */ + private static final ThreadLocal currentAffCtx = new ThreadLocal<>(); + + /** + * Constructor. + * + * @param platformCtx Context. + * @param baseFunc Function to wrap. + */ + protected PlatformAffinityFunctionTarget(PlatformContext platformCtx, AffinityFunction baseFunc) { + super(platformCtx); + + assert baseFunc != null; + this.baseFunc = baseFunc; + + try { + platformCtx.kernalContext().resource().injectGeneric(baseFunc); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + if (type == OP_PARTITION) + return baseFunc.partition(reader.readObjectDetached()); + else if (type == OP_REMOVE_NODE) { + baseFunc.removeNode(reader.readUuid()); + + return 0; + } + + return super.processInStreamOutLong(type, reader); + } + + /** {@inheritDoc} */ + @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + if (type == OP_ASSIGN_PARTITIONS) { + AffinityFunctionContext affCtx = currentAffCtx.get(); + + if (affCtx == null) + throw new IgniteException("Thread-local AffinityFunctionContext is null. " + + "This may indicate an unsupported call to the base AffinityFunction."); + + final List> partitions = baseFunc.assignPartitions(affCtx); + + PlatformAffinityUtils.writePartitionAssignment(partitions, writer, platformContext()); + + return; + } + + super.processOutStream(type, writer); + } + + /** + * Sets the context for current operation. + * + * @param ctx Context. + */ + void setCurrentAffinityFunctionContext(AffinityFunctionContext ctx) { + currentAffCtx.set(ctx); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java new file mode 100644 index 0000000..b1e1b23 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.cluster.IgniteClusterEx; +import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; +import org.apache.ignite.internal.processors.platform.PlatformContext; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Affinity serialization functions. + */ +public class PlatformAffinityUtils { + /** + * Writes the affinity function context. + * @param affCtx Affinity context. + * @param writer Writer. + * @param ctx Platform context. + */ + public static void writeAffinityFunctionContext(AffinityFunctionContext affCtx, BinaryRawWriterEx writer, + PlatformContext ctx) { + assert affCtx != null; + assert writer != null; + assert ctx != null; + + ctx.writeNodes(writer, affCtx.currentTopologySnapshot()); + + writer.writeInt(affCtx.backups()); + writer.writeLong(affCtx.currentTopologyVersion().topologyVersion()); + writer.writeInt(affCtx.currentTopologyVersion().minorTopologyVersion()); + + ctx.writeEvent(writer, affCtx.discoveryEvent()); + + // Write previous assignment + List> prevAssignment = ((GridAffinityFunctionContextImpl)affCtx).prevAssignment(); + + if (prevAssignment == null) + writer.writeInt(-1); + else { + writer.writeInt(prevAssignment.size()); + + for (List part : prevAssignment) + ctx.writeNodes(writer, part); + } + } + + /** + * Writes the partition assignment to a stream. + * + * @param partitions Partitions. + * @param writer Writer. + */ + public static void writePartitionAssignment(Collection> partitions, BinaryRawWriterEx writer, + PlatformContext ctx) { + assert partitions != null; + assert writer != null; + + writer.writeInt(partitions.size()); + + for (List part : partitions) + ctx.writeNodes(writer, part); + } + + /** + * Reads the partition assignment. + * + * @param reader Reader. + * @param ctx Platform context. + * @return Partitions. + */ + public static List> readPartitionAssignment(BinaryRawReader reader, PlatformContext ctx) { + assert reader != null; + assert ctx != null; + + int partCnt = reader.readInt(); + + List> res = new ArrayList<>(partCnt); + + IgniteClusterEx cluster = ctx.kernalContext().grid().cluster(); + + for (int i = 0; i < partCnt; i++) { + int partSize = reader.readInt(); + + List part = new ArrayList<>(partSize); + + for (int j = 0; j < partSize; j++) + part.add(cluster.node(reader.readUuid())); + + res.add(part); + } + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index 47862a2..1759a5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform.callback; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget; import org.apache.ignite.internal.util.GridStripedSpinBusyLock; /** @@ -920,6 +921,94 @@ public class PlatformCallbackGateway { } /** + * Initializes affinity function. + * + * @param memPtr Pointer to a stream with serialized affinity function. + * @param baseFunc Optional func for base calls. + * @return Affinity function pointer. + */ + public long affinityFunctionInit(long memPtr, PlatformAffinityFunctionTarget baseFunc) { + enter(); + + try { + return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr, baseFunc); + } + finally { + leave(); + } + } + + /** + * Gets the partition from affinity function. + * + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with key object. + * @return Partition number for a given key. + */ + public int affinityFunctionPartition(long ptr, long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.affinityFunctionPartition(envPtr, ptr, memPtr); + } + finally { + leave(); + } + } + + /** + * Assigns the affinity partitions. + * + * @param ptr Affinity function pointer. + * @param outMemPtr Pointer to a stream with affinity context. + * @param inMemPtr Pointer to a stream with result. + */ + public void affinityFunctionAssignPartitions(long ptr, long outMemPtr, long inMemPtr){ + enter(); + + try { + PlatformCallbackUtils.affinityFunctionAssignPartitions(envPtr, ptr, outMemPtr, inMemPtr); + } + finally { + leave(); + } + } + + /** + * Removes the node from affinity function. + * + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with node id. + */ + public void affinityFunctionRemoveNode(long ptr, long memPtr) { + enter(); + + try { + PlatformCallbackUtils.affinityFunctionRemoveNode(envPtr, ptr, memPtr); + } + finally { + leave(); + } + } + + /** + * Destroys the affinity function. + * + * @param ptr Affinity function pointer. + */ + public void affinityFunctionDestroy(long ptr) { + if (!lock.enterBusy()) + return; // skip: destroy is not necessary during shutdown. + + try { + PlatformCallbackUtils.affinityFunctionDestroy(envPtr, ptr); + } + finally { + leave(); + } + } + + /** * Enter gateway. */ protected void enter() { http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java index 7f3ba6f..1cbbd7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.platform.callback; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget; + /** * Platform callback utility methods. Implemented in target platform. All methods in this class must be * package-visible and invoked only through {@link PlatformCallbackGateway}. @@ -482,6 +484,53 @@ public class PlatformCallbackUtils { static native long extensionCallbackInLongLongOutLong(long envPtr, int typ, long arg1, long arg2); /** + * Initializes affinity function. + * + * @param envPtr Environment pointer. + * @param memPtr Pointer to a stream with serialized affinity function. + * @param baseFunc Optional func for base calls. + * @return Affinity function pointer. + */ + static native long affinityFunctionInit(long envPtr, long memPtr, PlatformAffinityFunctionTarget baseFunc); + + /** + * Gets the partition from affinity function. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with key object. + * @return Partition number for a given key. + */ + static native int affinityFunctionPartition(long envPtr, long ptr, long memPtr); + + /** + * Assigns the affinity partitions. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + * @param outMemPtr Pointer to a stream with affinity context. + * @param inMemPtr Pointer to a stream with result. + */ + static native void affinityFunctionAssignPartitions(long envPtr, long ptr, long outMemPtr, long inMemPtr); + + /** + * Removes the node from affinity function. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with node id. + */ + static native void affinityFunctionRemoveNode(long envPtr, long ptr, long memPtr); + + /** + * Destroys the affinity function. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + */ + static native void affinityFunctionDestroy(long envPtr, long ptr); + + /** * Private constructor. */ private PlatformCallbackUtils() { http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java index 6b9b441..f441f4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java @@ -19,21 +19,28 @@ package org.apache.ignite.internal.processors.platform.dotnet; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; -import org.apache.ignite.binary.BinaryIdMapper; import org.apache.ignite.binary.BinaryBasicIdMapper; -import org.apache.ignite.binary.BinaryNameMapper; import org.apache.ignite.binary.BinaryBasicNameMapper; +import org.apache.ignite.binary.BinaryIdMapper; +import org.apache.ignite.binary.BinaryNameMapper; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.BinaryConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.PlatformConfiguration; import org.apache.ignite.internal.MarshallerContextImpl; +import org.apache.ignite.internal.binary.BinaryContext; +import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.binary.GridBinaryMarshaller; -import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction; import org.apache.ignite.internal.processors.platform.lifecycle.PlatformLifecycleBean; -import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; @@ -42,8 +49,8 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lifecycle.LifecycleBean; import org.apache.ignite.logger.NullLogger; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction; import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration; -import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.platform.dotnet.PlatformDotNetLifecycleBean; import java.util.ArrayList; @@ -183,7 +190,9 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur try (PlatformMemory inMem = memMgr.allocate()) { PlatformOutputStream out = outMem.output(); - BinaryRawWriterEx writer = marshaller().writer(out); + final GridBinaryMarshaller marshaller = marshaller(); + + BinaryRawWriterEx writer = marshaller.writer(out); PlatformUtils.writeDotNetConfiguration(writer, interopCfg.unwrap()); @@ -196,12 +205,24 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur writer.writeMap(bean.getProperties()); } + // Write .NET affinity funcs + List affFuncs = affinityFunctions(igniteCfg); + + writer.writeInt(affFuncs.size()); + + for (PlatformDotNetAffinityFunction func : affFuncs) { + writer.writeString(func.getTypeName()); + writer.writeMap(func.getProperties()); + } + out.synchronize(); gate.extensionCallbackInLongLongOutLong( PlatformUtils.OP_PREPARE_DOT_NET, outMem.pointer(), inMem.pointer()); - processPrepareResult(inMem.input()); + BinaryReaderExImpl reader = new BinaryReaderExImpl(marshaller.context(), inMem.input(), null); + + processPrepareResult(reader); } } } @@ -211,7 +232,7 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur * * @param in Input stream. */ - private void processPrepareResult(PlatformInputStream in) { + private void processPrepareResult(BinaryReaderExImpl in) { assert cfg != null; List beans = beans(cfg); @@ -245,6 +266,63 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur cfg.setLifecycleBeans(mergedBeans); } } + + // Process affinity functions + List affFuncs = affinityFunctions(cfg); + + if (!affFuncs.isEmpty()) { + for (PlatformDotNetAffinityFunction aff : affFuncs) + aff.init(readAffinityFunction(in)); + } + } + + /** + * Reads the affinity function. + * + * @param in Stream. + * @return Affinity function. + */ + private static PlatformAffinityFunction readAffinityFunction(BinaryRawReaderEx in) { + byte plcTyp = in.readByte(); + + if (plcTyp == 0) + return null; + + int partitions = in.readInt(); + boolean exclNeighbours = in.readBoolean(); + byte overrideFlags = in.readByte(); + Object userFunc = in.readObjectDetached(); + + AffinityFunction baseFunc = null; + + switch (plcTyp) { + case 1: { + FairAffinityFunction f = new FairAffinityFunction(); + + f.setPartitions(partitions); + f.setExcludeNeighbors(exclNeighbours); + + baseFunc = f; + + break; + } + + case 2: { + RendezvousAffinityFunction f = new RendezvousAffinityFunction(); + + f.setPartitions(partitions); + f.setExcludeNeighbors(exclNeighbours); + + baseFunc = f; + + break; + } + + default: + assert plcTyp == 3 : "Unknown affinity function policy type: " + plcTyp; + } + + return new PlatformAffinityFunction(userFunc, partitions, overrideFlags, baseFunc); } /** @@ -289,4 +367,25 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur throw U.convertException(e); } } + + /** + * Find .NET affinity functions in configuration. + * + * @param cfg Configuration. + * @return affinity functions. + */ + private static List affinityFunctions(IgniteConfiguration cfg) { + List res = new ArrayList<>(); + + CacheConfiguration[] cacheCfg = cfg.getCacheConfiguration(); + + if (cacheCfg != null) { + for (CacheConfiguration ccfg : cacheCfg) { + if (ccfg.getAffinity() instanceof PlatformDotNetAffinityFunction) + res.add((PlatformDotNetAffinityFunction)ccfg.getAffinity()); + } + } + + return res; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java new file mode 100644 index 0000000..254c379 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.platform.dotnet; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.resources.IgniteInstanceResource; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * AffinityFunction implementation which can be used to configure .NET affinity function in Java Spring configuration. + */ +public class PlatformDotNetAffinityFunction implements AffinityFunction, Externalizable, LifecycleAware { + /** */ + private static final long serialVersionUID = 0L; + + /** .NET type name. */ + private transient String typName; + + /** Properties. */ + private transient Map props; + + /** Inner function. */ + private PlatformAffinityFunction func; + + /** + * Gets .NET type name. + * + * @return .NET type name. + */ + public String getTypeName() { + return typName; + } + + /** + * Sets .NET type name. + * + * @param typName .NET type name. + */ + public void setTypeName(String typName) { + this.typName = typName; + } + + /** + * Get properties. + * + * @return Properties. + */ + public Map getProperties() { + return props; + } + + /** + * Set properties. + * + * @param props Properties. + */ + public void setProperties(Map props) { + this.props = props; + } + + /** {@inheritDoc} */ + @Override public void reset() { + assert func != null; + + func.reset(); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + assert func != null; + + return func.partitions(); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + assert func != null; + + return func.partition(key); + } + + /** {@inheritDoc} */ + @Override public List> assignPartitions(AffinityFunctionContext affCtx) { + assert func != null; + + return func.assignPartitions(affCtx); + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + assert func != null; + + func.removeNode(nodeId); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(func); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + func = (PlatformAffinityFunction) in.readObject(); + } + + /** + * Initializes this instance. + * + * @param func Underlying func. + */ + public void init(PlatformAffinityFunction func) { + assert func != null; + + this.func = func; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + assert func != null; + + func.start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + assert func != null; + + func.stop(); + } + + /** + * Injects the Ignite. + * + * @param ignite Ignite. + */ + @SuppressWarnings("unused") + @IgniteInstanceResource + private void setIgnite(Ignite ignite) { + assert func != null; + + func.setIgnite(ignite); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/cpp/common/include/ignite/common/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/common/java.h b/modules/platforms/cpp/common/include/ignite/common/java.h index e629c77..91caddd 100644 --- a/modules/platforms/cpp/common/include/ignite/common/java.h +++ b/modules/platforms/cpp/common/include/ignite/common/java.h @@ -103,6 +103,12 @@ namespace ignite typedef long long(JNICALL *ExtensionCallbackInLongOutLongHandler)(void* target, int typ, long long arg1); typedef long long(JNICALL *ExtensionCallbackInLongLongOutLongHandler)(void* target, int typ, long long arg1, long long arg2); + typedef long long(JNICALL *AffinityFunctionInitHandler)(void* target, long long memPtr, void* baseFunc); + typedef int(JNICALL *AffinityFunctionPartitionHandler)(void* target, long long ptr, long long memPtr); + typedef void(JNICALL *AffinityFunctionAssignPartitionsHandler)(void* target, long long ptr, long long inMemPtr, long long outMemPtr); + typedef void(JNICALL *AffinityFunctionRemoveNodeHandler)(void* target, long long ptr, long long memPtr); + typedef void(JNICALL *AffinityFunctionDestroyHandler)(void* target, long long ptr); + /** * JNI handlers holder. */ @@ -177,6 +183,12 @@ namespace ignite ExtensionCallbackInLongOutLongHandler extensionCallbackInLongOutLong; ExtensionCallbackInLongLongOutLongHandler extensionCallbackInLongLongOutLong; + + AffinityFunctionInitHandler affinityFunctionInit; + AffinityFunctionPartitionHandler affinityFunctionPartition; + AffinityFunctionAssignPartitionsHandler affinityFunctionAssignPartitions; + AffinityFunctionRemoveNodeHandler affinityFunctionRemoveNode; + AffinityFunctionDestroyHandler affinityFunctionDestroy; }; /** @@ -683,6 +695,12 @@ namespace ignite JNIEXPORT jlong JNICALL JniExtensionCallbackInLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1); JNIEXPORT jlong JNICALL JniExtensionCallbackInLongLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1, jlong arg2); + + JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr, jobject baseFunc); + JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr); + JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr); + JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr); + JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/cpp/common/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/src/java.cpp b/modules/platforms/cpp/common/src/java.cpp index 63deba5..789b6a3 100644 --- a/modules/platforms/cpp/common/src/java.cpp +++ b/modules/platforms/cpp/common/src/java.cpp @@ -346,6 +346,12 @@ namespace ignite JniMethod M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_OUT_LONG = JniMethod("extensionCallbackInLongOutLong", "(JIJ)J", true); JniMethod M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_LONG_OUT_LONG = JniMethod("extensionCallbackInLongLongOutLong", "(JIJJ)J", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget;)J", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY = JniMethod("affinityFunctionDestroy", "(JJ)V", true); + const char* C_PLATFORM_UTILS = "org/apache/ignite/internal/processors/platform/utils/PlatformUtils"; JniMethod M_PLATFORM_UTILS_REALLOC = JniMethod("reallocate", "(JI)V", true); JniMethod M_PLATFORM_UTILS_ERR_DATA = JniMethod("errorData", "(Ljava/lang/Throwable;)[B", true); @@ -766,7 +772,7 @@ namespace ignite void RegisterNatives(JNIEnv* env) { { - JNINativeMethod methods[52]; + JNINativeMethod methods[57]; int idx = 0; @@ -840,6 +846,12 @@ namespace ignite AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_OUT_LONG, reinterpret_cast(JniExtensionCallbackInLongOutLong)); AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_EXTENSION_CALLBACK_IN_LONG_LONG_OUT_LONG, reinterpret_cast(JniExtensionCallbackInLongLongOutLong)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT, reinterpret_cast(JniAffinityFunctionInit)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION, reinterpret_cast(JniAffinityFunctionPartition)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS, reinterpret_cast(JniAffinityFunctionAssignPartitions)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE, reinterpret_cast(JniAffinityFunctionRemoveNode)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY, reinterpret_cast(JniAffinityFunctionDestroy)); + jint res = env->RegisterNatives(FindClass(env, C_PLATFORM_CALLBACK_UTILS), methods, idx); if (res != JNI_OK) @@ -2471,6 +2483,26 @@ namespace ignite JNIEXPORT jlong JNICALL JniExtensionCallbackInLongLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1, jlong arg2) { IGNITE_SAFE_FUNC(env, envPtr, ExtensionCallbackInLongLongOutLongHandler, extensionCallbackInLongLongOutLong, typ, arg1, arg2); } - } + + JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr, jobject baseFunc) { + void* baseFuncRef = baseFunc ? env->NewGlobalRef(baseFunc) : nullptr; + IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionInitHandler, affinityFunctionInit, memPtr, baseFuncRef); + } + + JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) { + IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionPartitionHandler, affinityFunctionPartition, ptr, memPtr); + } + + JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr) { + IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionAssignPartitionsHandler, affinityFunctionAssignPartitions, ptr, inMemPtr, outMemPtr); + } + + JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) { + IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionRemoveNodeHandler, affinityFunctionRemoveNode, ptr, memPtr); + } + + JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr) { + IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionDestroyHandler, affinityFunctionDestroy, ptr); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index 0194450..89cd2a7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -60,6 +60,7 @@ + @@ -175,6 +176,12 @@ Always Designer + + PreserveNewest + + + PreserveNewest + Always @@ -262,4 +269,4 @@ copy /Y $(SolutionDir)Apache.Ignite\bin\$(PlatformName)\$(ConfigurationName)\Apa --> - + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs new file mode 100644 index 0000000..7b317ac --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ReSharper disable UnusedAutoPropertyAccessor.Local +// ReSharper disable UnusedMember.Local +namespace Apache.Ignite.Core.Tests.Cache.Affinity +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Cache.Affinity.Fair; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Resource; + using NUnit.Framework; + + /// + /// Tests AffinityFunction defined in Spring XML. + /// + public class AffinityFunctionSpringTest : IgniteTestBase + { + /// + /// Initializes a new instance of the class. + /// + public AffinityFunctionSpringTest() : base(6, + "config\\cache\\affinity\\affinity-function.xml", + "config\\cache\\affinity\\affinity-function2.xml") + { + // No-op. + } + + /// + /// Tests the static cache. + /// + [Test] + public void TestStaticCache() + { + ValidateAffinityFunction(Grid.GetCache("cache1")); + ValidateAffinityFunction(Grid2.GetCache("cache1")); + ValidateAffinityFunction(Grid.GetCache("cache2")); + ValidateAffinityFunction(Grid2.GetCache("cache2")); + } + + /// + /// Tests the dynamic cache. + /// + [Test] + public void TestDynamicCache() + { + ValidateAffinityFunction(Grid.CreateCache("dyn-cache-1")); + ValidateAffinityFunction(Grid2.GetCache("dyn-cache-1")); + + ValidateAffinityFunction(Grid2.CreateCache("dyn-cache-2")); + ValidateAffinityFunction(Grid.GetCache("dyn-cache-2")); + + ValidateAffinityFunction(Grid.CreateCache("dyn-cache2-1")); + ValidateAffinityFunction(Grid2.GetCache("dyn-cache2-1")); + + ValidateAffinityFunction(Grid2.CreateCache("dyn-cache2-2")); + ValidateAffinityFunction(Grid.GetCache("dyn-cache2-2")); + } + + /// + /// Validates the affinity function. + /// + /// The cache. + private static void ValidateAffinityFunction(ICache cache) + { + var aff = cache.Ignite.GetAffinity(cache.Name); + + Assert.AreEqual(5, aff.Partitions); + + // Predefined map + Assert.AreEqual(2, aff.GetPartition(1L)); + Assert.AreEqual(1, aff.GetPartition(2L)); + + // Other keys + Assert.AreEqual(1, aff.GetPartition(13L)); + Assert.AreEqual(3, aff.GetPartition(4L)); + } + + private class TestFunc : IAffinityFunction // [Serializable] is not necessary + { + [InstanceResource] + private readonly IIgnite _ignite = null; + + private int Property1 { get; set; } + + private string Property2 { get; set; } + + public int Partitions + { + get { return 5; } + } + + public int GetPartition(object key) + { + Assert.IsNotNull(_ignite); + Assert.AreEqual(1, Property1); + Assert.AreEqual("1", Property2); + + var longKey = (long)key; + int res; + + if (TestFairFunc.PredefinedParts.TryGetValue(longKey, out res)) + return res; + + return (int)(longKey * 2 % 5); + } + + // ReSharper disable once UnusedParameter.Local + public void RemoveNode(Guid nodeId) + { + // No-op. + } + + public IEnumerable> AssignPartitions(AffinityFunctionContext context) + { + return Enumerable.Range(0, Partitions).Select(x => context.CurrentTopologySnapshot); + } + } + + private class TestFairFunc : FairAffinityFunction // [Serializable] is not necessary + { + public static readonly Dictionary PredefinedParts = new Dictionary + { + {1, 2}, + {2, 1} + }; + + [InstanceResource] + private readonly IIgnite _ignite = null; + + private int Property1 { get; set; } + + private string Property2 { get; set; } + + public override int GetPartition(object key) + { + Assert.IsNotNull(_ignite); + Assert.AreEqual(1, Property1); + Assert.AreEqual("1", Property2); + + Assert.IsInstanceOf(key); + + var basePart = base.GetPartition(key); + Assert.Greater(basePart, -1); + Assert.Less(basePart, Partitions); + + var longKey = (long) key; + int res; + + if (PredefinedParts.TryGetValue(longKey, out res)) + return res; + + return (int) (longKey * 2 % 5); + } + + public override IEnumerable> AssignPartitions(AffinityFunctionContext context) + { + var baseRes = base.AssignPartitions(context).ToList(); // test base call + + Assert.AreEqual(Partitions, baseRes.Count); + + return baseRes; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml new file mode 100644 index 0000000..67ff128 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml @@ -0,0 +1,129 @@ + + + + + + + + + + + + + + + + + + + + + + + 1 + + + + + + + + + + + + + + + + + 1 + + + + + + + + + + + + + + + + + 1 + + + + 5 + + + + + + + + + + + + + + + + 1 + + + + 5 + + + + + + + + + + + + + + + + + 127.0.0.1:47500 + + + + + + + + + http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml new file mode 100644 index 0000000..cab34b5 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml @@ -0,0 +1,49 @@ + + + + + + + + + + + + + + + + + + 127.0.0.1:47500 + + + + + + + + + http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs index 2b0ab8e..95be6dc 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Tests using System; using System.Diagnostics; using System.Reflection; + using Apache.Ignite.Core.Tests.Cache.Affinity; using Apache.Ignite.Core.Tests.Memory; using NUnit.ConsoleRunner; @@ -33,7 +34,7 @@ namespace Apache.Ignite.Core.Tests //TestOne(typeof(ContinuousQueryAtomiclBackupTest), "TestInitialQuery"); - TestAll(typeof (ExecutableTest)); + TestAll(typeof (AffinityFunctionSpringTest)); //TestAllInAssembly(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 05a7fa7..6793873 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -55,6 +55,12 @@ + + + + + + @@ -108,7 +114,10 @@ + + + @@ -414,4 +423,4 @@ --> - + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/5f57cc8d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs new file mode 100644 index 0000000..ce2e5e1 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Cache.Affinity +{ + using System; + using System.Collections.Generic; + using System.ComponentModel; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + + /// + /// Base class for predefined affinity functions. + /// + [Serializable] + public abstract class AffinityFunctionBase : IAffinityFunction + { + /// The default value for property. + public const int DefaultPartitions = 1024; + + /** */ + private int _partitions = DefaultPartitions; + + /** */ + private IAffinityFunction _baseFunction; + + + /// + /// Gets or sets the total number of partitions. + /// + [DefaultValue(DefaultPartitions)] + public virtual int Partitions + { + get { return _partitions; } + set { _partitions = value; } + } + + /// + /// Gets partition number for a given key starting from 0. Partitioned caches + /// should make sure that keys are about evenly distributed across all partitions + /// from 0 to for best performance. + /// + /// Note that for fully replicated caches it is possible to segment key sets among different + /// grid node groups. In that case each node group should return a unique partition + /// number. However, unlike partitioned cache, mappings of keys to nodes in + /// replicated caches are constant and a node cannot migrate from one partition + /// to another. + /// + /// Key to get partition for. + /// + /// Partition number for a given key. + /// + public virtual int GetPartition(object key) + { + ThrowIfUninitialized(); + + return _baseFunction.GetPartition(key); + } + + /// + /// Removes node from affinity. This method is called when it is safe to remove + /// disconnected node from affinity mapping. + /// + /// The node identifier. + public virtual void RemoveNode(Guid nodeId) + { + ThrowIfUninitialized(); + + _baseFunction.RemoveNode(nodeId); + } + + /// + /// Gets affinity nodes for a partition. In case of replicated cache, all returned + /// nodes are updated in the same manner. In case of partitioned cache, the returned + /// list should contain only the primary and back up nodes with primary node being + /// always first. + /// + /// Note that partitioned affinity must obey the following contract: given that node + /// N is primary for some key K, if any other node(s) leave + /// grid and no node joins grid, node N will remain primary for key K. + /// + /// The affinity function context. + /// + /// A collection of partitions, where each partition is a collection of nodes, + /// where first node is a primary node, and other nodes are backup nodes. + /// + public virtual IEnumerable> AssignPartitions(AffinityFunctionContext context) + { + ThrowIfUninitialized(); + + return _baseFunction.AssignPartitions(context); + } + + /// + /// Gets or sets a value indicating whether to exclude same-host-neighbors from being backups of each other. + /// + public virtual bool ExcludeNeighbors { get; set; } + + /// + /// Initializes a new instance of the class. + /// + internal AffinityFunctionBase() + { + // No-op. + } + + /// + /// Sets the base function. + /// + /// The base function. + internal void SetBaseFunction(IAffinityFunction baseFunc) + { + _baseFunction = baseFunc; + } + + /// + /// Gets the direct usage error. + /// + private void ThrowIfUninitialized() + { + if (_baseFunction == null) + throw new IgniteException(GetType() + " has not yet been initialized."); + } + } +}