Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A7765200B4B for ; Thu, 7 Jul 2016 08:01:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A60C1160A64; Thu, 7 Jul 2016 06:01:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 80AB6160A81 for ; Thu, 7 Jul 2016 08:01:28 +0200 (CEST) Received: (qmail 80697 invoked by uid 500); 7 Jul 2016 06:01:27 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 79555 invoked by uid 99); 7 Jul 2016 06:01:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Jul 2016 06:01:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E20C0EC22C; Thu, 7 Jul 2016 06:01:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 07 Jul 2016 06:01:48 -0000 Message-Id: <4041660d732b413e8edbd8146ff3e381@git.apache.org> In-Reply-To: <078d677a2cf641a7b437be012bf63361@git.apache.org> References: <078d677a2cf641a7b437be012bf63361@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [23/50] [abbrv] ignite git commit: IGNITE-3398 .NET: Allow extending predefined Affinity Functions archived-at: Thu, 07 Jul 2016 06:01:32 -0000 IGNITE-3398 .NET: Allow extending predefined Affinity Functions This closes #848 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/11d97f17 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/11d97f17 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/11d97f17 Branch: refs/heads/ignite-1232 Commit: 11d97f174db9253cf7cd1c62d3d6f86f582c2a21 Parents: b3af61f Author: Pavel Tupitsyn Authored: Tue Jul 5 15:30:41 2016 +0300 Committer: Pavel Tupitsyn Committed: Tue Jul 5 15:30:41 2016 +0300 ---------------------------------------------------------------------- .../affinity/PlatformAffinityFunction.java | 154 +++++++---- .../PlatformAffinityFunctionTarget.java | 113 ++++++++ .../cache/affinity/PlatformAffinityUtils.java | 113 ++++++++ .../callback/PlatformCallbackGateway.java | 6 +- .../callback/PlatformCallbackUtils.java | 5 +- .../PlatformDotNetConfigurationClosure.java | 8 +- .../utils/PlatformConfigurationUtils.java | 96 ++++--- .../dotnet/PlatformDotNetAffinityFunction.java | 89 +++--- .../platforms/cpp/jni/include/ignite/jni/java.h | 4 +- modules/platforms/cpp/jni/src/java.cpp | 7 +- .../Affinity/AffinityFunctionSpringTest.cs | 64 ++++- .../Cache/Affinity/AffinityFunctionTest.cs | 209 ++++++++++++-- .../Config/Cache/Affinity/affinity-function.xml | 40 +++ .../Apache.Ignite.Core.Tests/TestRunner.cs | 4 +- .../Apache.Ignite.Core.csproj | 3 + .../Cache/Affinity/AffinityFunctionBase.cs | 126 +++------ .../Cache/Affinity/AffinityFunctionContext.cs | 14 +- .../Cache/Affinity/Fair/FairAffinityFunction.cs | 4 + .../Rendezvous/RendezvousAffinityFunction.cs | 4 + .../Cache/Configuration/CacheConfiguration.cs | 5 +- .../dotnet/Apache.Ignite.Core/Ignition.cs | 6 +- .../Impl/Binary/BinaryReaderExtensions.cs | 14 + .../Impl/Binary/Marshaller.cs | 1 + .../Affinity/AffinityFunctionSerializer.cs | 277 +++++++++++++++++++ .../Cache/Affinity/PlatformAffinityFunction.cs | 74 +++++ .../Impl/Common/ObjectInfoHolder.cs | 86 ++++++ .../Apache.Ignite.Core/Impl/IgniteUtils.cs | 21 ++ .../Impl/Unmanaged/UnmanagedCallbacks.cs | 51 +--- 28 files changed, 1271 insertions(+), 327 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java index fc2496c..1e844e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java @@ -22,12 +22,8 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.cache.affinity.AffinityFunctionContext; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; -import org.apache.ignite.internal.cluster.IgniteClusterEx; -import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; import org.apache.ignite.internal.processors.platform.PlatformContext; -import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; @@ -38,7 +34,6 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -50,6 +45,15 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl private static final long serialVersionUID = 0L; /** */ + private static final byte FLAG_PARTITION = 1; + + /** */ + private static final byte FLAG_REMOVE_NODE = 1 << 1; + + /** */ + private static final byte FLAG_ASSIGN_PARTITIONS = 1 << 2; + + /** */ private Object userFunc; /** @@ -62,6 +66,12 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl private int partitions; /** */ + private AffinityFunction baseFunc; + + /** */ + private byte overrideFlags; + + /** */ private transient Ignite ignite; /** */ @@ -70,6 +80,10 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl /** */ private transient long ptr; + /** */ + private transient PlatformAffinityFunctionTarget baseTarget; + + /** * Ctor for serialization. * @@ -84,33 +98,45 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl * @param func User fun object. * @param partitions Number of partitions. */ - public PlatformAffinityFunction(Object func, int partitions) { + public PlatformAffinityFunction(Object func, int partitions, byte overrideFlags, AffinityFunction baseFunc) { userFunc = func; this.partitions = partitions; + this.overrideFlags = overrideFlags; + this.baseFunc = baseFunc; } /** - * Ctor. + * Gets the user func object. * - * @param ptr User func ptr. - * @param partitions Number of partitions. + * @return User func object. */ - public PlatformAffinityFunction(PlatformContext ctx, long ptr, int partitions) { - this.ctx = ctx; - this.ptr = ptr; - this.partitions = partitions; + public Object getUserFunc() { + return userFunc; + } - ignite = ctx.kernalContext().grid(); + /** + * Gets the base func. + * + * @return Base func. + */ + public AffinityFunction getBaseFunc() { + return baseFunc; } - /** {@inheritDoc} */ - public Object getUserFunc() { - return userFunc; + /** + * Gets the override flags. + * + * @return The override flags + */ + public byte getOverrideFlags() { + return overrideFlags; } /** {@inheritDoc} */ @Override public void reset() { - // No-op: userFunc is always in initial state (it is serialized only once on start). + // userFunc is always in initial state (it is serialized only once on start). + if (baseFunc != null) + baseFunc.reset(); } /** {@inheritDoc} */ @@ -124,6 +150,12 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl /** {@inheritDoc} */ @Override public int partition(Object key) { + if ((overrideFlags & FLAG_PARTITION) == 0) { + assert baseFunc != null; + + return baseFunc.partition(key); + } + assert ctx != null; assert ptr != 0; @@ -141,6 +173,12 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl /** {@inheritDoc} */ @Override public List> assignPartitions(AffinityFunctionContext affCtx) { + if ((overrideFlags & FLAG_ASSIGN_PARTITIONS) == 0) { + assert baseFunc != null; + + return baseFunc.assignPartitions(affCtx); + } + assert ctx != null; assert ptr != 0; assert affCtx != null; @@ -151,53 +189,41 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl BinaryRawWriterEx writer = ctx.writer(out); // Write previous assignment - List> prevAssignment = ((GridAffinityFunctionContextImpl)affCtx).prevAssignment(); + PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx); - if (prevAssignment == null) - writer.writeInt(-1); - else { - writer.writeInt(prevAssignment.size()); - - for (List part : prevAssignment) - ctx.writeNodes(writer, part); - } - - // Write other props - writer.writeInt(affCtx.backups()); - ctx.writeNodes(writer, affCtx.currentTopologySnapshot()); - writer.writeLong(affCtx.currentTopologyVersion().topologyVersion()); - writer.writeInt(affCtx.currentTopologyVersion().minorTopologyVersion()); - ctx.writeEvent(writer, affCtx.discoveryEvent()); - - // Call platform out.synchronize(); - ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer()); - - PlatformInputStream in = inMem.input(); - BinaryRawReaderEx reader = ctx.reader(in); - // Read result - int partCnt = in.readInt(); - List> res = new ArrayList<>(partCnt); - IgniteClusterEx cluster = ctx.kernalContext().grid().cluster(); - - for (int i = 0; i < partCnt; i++) { - int partSize = in.readInt(); - List part = new ArrayList<>(partSize); - - for (int j = 0; j < partSize; j++) - part.add(cluster.node(reader.readUuid())); - - res.add(part); + // Call platform + // We can not restore original AffinityFunctionContext after the call to platform, + // due to DiscoveryEvent (when node leaves, we can't get it by id anymore). + // Secondly, AffinityFunctionContext can't be changed by the user. + if (baseTarget != null) + baseTarget.setCurrentAffinityFunctionContext(affCtx); + + try { + ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer()); + } + finally { + if (baseTarget != null) + baseTarget.setCurrentAffinityFunctionContext(null); } - return res; + // Read result + return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(inMem), ctx); } } } /** {@inheritDoc} */ @Override public void removeNode(UUID nodeId) { + if ((overrideFlags & FLAG_REMOVE_NODE) == 0) { + assert baseFunc != null; + + baseFunc.removeNode(nodeId); + + return; + } + assert ctx != null; assert ptr != 0; @@ -217,16 +243,24 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(userFunc); out.writeInt(partitions); + out.writeByte(overrideFlags); + out.writeObject(baseFunc); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { userFunc = in.readObject(); partitions = in.readInt(); + overrideFlags = in.readByte(); + baseFunc = (AffinityFunction)in.readObject(); } /** {@inheritDoc} */ @Override public void start() throws IgniteException { + // userFunc is null when there is nothing overridden + if (userFunc == null) + return; + assert ignite != null; ctx = PlatformUtils.platformContext(ignite); assert ctx != null; @@ -239,12 +273,19 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl out.synchronize(); - ptr = ctx.gateway().affinityFunctionInit(mem.pointer()); + baseTarget = baseFunc != null + ? new PlatformAffinityFunctionTarget(ctx, baseFunc) + : null; + + ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTarget); } } /** {@inheritDoc} */ @Override public void stop() throws IgniteException { + if (ptr == 0) + return; + assert ctx != null; ctx.gateway().affinityFunctionDestroy(ptr); @@ -255,8 +296,9 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl * * @param ignite Ignite. */ + @SuppressWarnings("unused") @IgniteInstanceResource - private void setIgnite(Ignite ignite) { + public void setIgnite(Ignite ignite) { this.ignite = ignite; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java new file mode 100644 index 0000000..d83dd08 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.util.List; + +/** + * Platform affinity function target: + * to be invoked when Platform function calls base implementation of one of the AffinityFunction methods. + */ +public class PlatformAffinityFunctionTarget extends PlatformAbstractTarget { + /** */ + private static final int OP_PARTITION = 1; + + /** */ + private static final int OP_REMOVE_NODE = 2; + + /** */ + private static final int OP_ASSIGN_PARTITIONS = 3; + + /** Inner function to delegate calls to. */ + private final AffinityFunction baseFunc; + + /** Thread local to hold the current affinity function context. */ + private static final ThreadLocal currentAffCtx = new ThreadLocal<>(); + + /** + * Constructor. + * + * @param platformCtx Context. + * @param baseFunc Function to wrap. + */ + protected PlatformAffinityFunctionTarget(PlatformContext platformCtx, AffinityFunction baseFunc) { + super(platformCtx); + + assert baseFunc != null; + this.baseFunc = baseFunc; + + try { + platformCtx.kernalContext().resource().injectGeneric(baseFunc); + } catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + if (type == OP_PARTITION) + return baseFunc.partition(reader.readObjectDetached()); + else if (type == OP_REMOVE_NODE) { + baseFunc.removeNode(reader.readUuid()); + return 0; + } + + return super.processInStreamOutLong(type, reader); + } + + /** {@inheritDoc} */ + @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + if (type == OP_ASSIGN_PARTITIONS) { + AffinityFunctionContext affCtx = currentAffCtx.get(); + + if (affCtx == null) + throw new IgniteException("Thread-local AffinityFunctionContext is null. " + + "This may indicate an unsupported call to the base AffinityFunction"); + + final List> partitions = baseFunc.assignPartitions(affCtx); + + PlatformAffinityUtils.writePartitionAssignment(partitions, writer, platformContext()); + + return; + } + + super.processOutStream(type, writer); + } + + + + /** + * Sets the context for current operation. + * + * @param ctx Context. + */ + void setCurrentAffinityFunctionContext(AffinityFunctionContext ctx) { + currentAffCtx.set(ctx); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java new file mode 100644 index 0000000..6d14cab --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.cluster.IgniteClusterEx; +import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.jetbrains.annotations.NotNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Affinity serialization functions. + */ +public class PlatformAffinityUtils { + /** + * Writes the affinity function context. + * @param affCtx Affinity context. + * @param writer Writer. + * @param ctx Platform context. + */ + public static void writeAffinityFunctionContext(AffinityFunctionContext affCtx, BinaryRawWriterEx writer, + PlatformContext ctx) { + assert affCtx != null; + assert writer != null; + assert ctx != null; + + ctx.writeNodes(writer, affCtx.currentTopologySnapshot()); + writer.writeInt(affCtx.backups()); + writer.writeLong(affCtx.currentTopologyVersion().topologyVersion()); + writer.writeInt(affCtx.currentTopologyVersion().minorTopologyVersion()); + ctx.writeEvent(writer, affCtx.discoveryEvent()); + + // Write previous assignment + List> prevAssignment = ((GridAffinityFunctionContextImpl)affCtx).prevAssignment(); + + if (prevAssignment == null) + writer.writeInt(-1); + else { + writer.writeInt(prevAssignment.size()); + + for (List part : prevAssignment) + ctx.writeNodes(writer, part); + } + } + + /** + * Writes the partition assignment to a stream. + * + * @param partitions Partitions. + * @param writer Writer. + */ + public static void writePartitionAssignment(Collection> partitions, BinaryRawWriterEx writer, + PlatformContext ctx) { + assert partitions != null; + assert writer != null; + + writer.writeInt(partitions.size()); + + for (List part : partitions) + ctx.writeNodes(writer, part); + } + + /** + * Reads the partition assignment. + * + * @param reader Reader. + * @param ctx Platform context. + * @return Partitions. + */ + @NotNull + public static List> readPartitionAssignment(BinaryRawReader reader, PlatformContext ctx) { + assert reader != null; + assert ctx != null; + + int partCnt = reader.readInt(); + List> res = new ArrayList<>(partCnt); + IgniteClusterEx cluster = ctx.kernalContext().grid().cluster(); + + for (int i = 0; i < partCnt; i++) { + int partSize = reader.readInt(); + List part = new ArrayList<>(partSize); + + for (int j = 0; j < partSize; j++) + part.add(cluster.node(reader.readUuid())); + + res.add(part); + } + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index 3708e8f..88532fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform.callback; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget; import org.apache.ignite.internal.util.GridStripedSpinBusyLock; /** @@ -954,13 +955,14 @@ public class PlatformCallbackGateway { * Initializes affinity function. * * @param memPtr Pointer to a stream with serialized affinity function. + * @param baseFunc Optional func for base calls. * @return Affinity function pointer. */ - public long affinityFunctionInit(long memPtr) { + public long affinityFunctionInit(long memPtr, PlatformAffinityFunctionTarget baseFunc) { enter(); try { - return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr); + return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr, baseFunc); } finally { leave(); http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java index d19782d..7b36e5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.platform.callback; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget; + /** * Platform callback utility methods. Implemented in target platform. All methods in this class must be * package-visible and invoked only through {@link PlatformCallbackGateway}. @@ -500,9 +502,10 @@ public class PlatformCallbackUtils { * * @param envPtr Environment pointer. * @param memPtr Pointer to a stream with serialized affinity function. + * @param baseFunc Optional func for base calls. * @return Affinity function pointer. */ - static native long affinityFunctionInit(long envPtr, long memPtr); + static native long affinityFunctionInit(long envPtr, long memPtr, PlatformAffinityFunctionTarget baseFunc); /** * Gets the partition from affinity function. http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java index 0a267fb..cb9696c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java @@ -197,8 +197,10 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur writer.writeInt(affFuncs.size()); - for (PlatformDotNetAffinityFunction func : affFuncs) - func.write(writer); + for (PlatformDotNetAffinityFunction func : affFuncs) { + writer.writeString(func.getTypeName()); + writer.writeMap(func.getProperties()); + } out.synchronize(); @@ -258,7 +260,7 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur if (!affFuncs.isEmpty()) { for (PlatformDotNetAffinityFunction aff : affFuncs) - aff.initPartitions(in.readInt()); + aff.init(PlatformConfigurationUtils.readAffinityFunction(in)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index 7353f08..1d2baf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -42,10 +42,7 @@ import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.binary.*; import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction; -import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration; -import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration; -import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative; -import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration; +import org.apache.ignite.platform.dotnet.*; import org.apache.ignite.spi.communication.CommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiMBean; @@ -238,32 +235,39 @@ public class PlatformConfigurationUtils { * @param in Stream. * @return Affinity function. */ - private static AffinityFunction readAffinityFunction(BinaryRawReaderEx in) { + public static PlatformAffinityFunction readAffinityFunction(BinaryRawReaderEx in) { byte plcTyp = in.readByte(); + if (plcTyp == 0) + return null; + + int partitions = in.readInt(); + boolean exclNeighbours = in.readBoolean(); + byte overrideFlags = in.readByte(); + Object userFunc = in.readObjectDetached(); + + AffinityFunction baseFunc = null; + switch (plcTyp) { - case 0: - break; case 1: { FairAffinityFunction f = new FairAffinityFunction(); - f.setPartitions(in.readInt()); - f.setExcludeNeighbors(in.readBoolean()); - return f; + f.setPartitions(partitions); + f.setExcludeNeighbors(exclNeighbours); + baseFunc = f; + break; } case 2: { RendezvousAffinityFunction f = new RendezvousAffinityFunction(); - f.setPartitions(in.readInt()); - f.setExcludeNeighbors(in.readBoolean()); - return f; - } - case 3: { - return new PlatformAffinityFunction(in.readObjectDetached(), in.readInt()); + f.setPartitions(partitions); + f.setExcludeNeighbors(exclNeighbours); + baseFunc = f; + break; } default: - assert false; + assert plcTyp == 3; } - return null; + return new PlatformAffinityFunction(userFunc, partitions, overrideFlags, baseFunc); } /** @@ -281,34 +285,56 @@ public class PlatformConfigurationUtils { } /** - * Writes the eviction policy. + * Writes the affinity functions. + * * @param out Stream. * @param f Affinity. */ private static void writeAffinityFunction(BinaryRawWriter out, AffinityFunction f) { + if (f instanceof PlatformDotNetAffinityFunction) + f = ((PlatformDotNetAffinityFunction)f).getFunc(); + if (f instanceof FairAffinityFunction) { - out.writeByte((byte)1); + out.writeByte((byte) 1); - FairAffinityFunction f0 = (FairAffinityFunction)f; + FairAffinityFunction f0 = (FairAffinityFunction) f; out.writeInt(f0.getPartitions()); out.writeBoolean(f0.isExcludeNeighbors()); - } - else if (f instanceof RendezvousAffinityFunction) { - out.writeByte((byte)2); + out.writeByte((byte) 0); // override flags + out.writeObject(null); // user func + } else if (f instanceof RendezvousAffinityFunction) { + out.writeByte((byte) 2); - RendezvousAffinityFunction f0 = (RendezvousAffinityFunction)f; + RendezvousAffinityFunction f0 = (RendezvousAffinityFunction) f; out.writeInt(f0.getPartitions()); out.writeBoolean(f0.isExcludeNeighbors()); - } - else if (f instanceof PlatformAffinityFunction) { - out.writeByte((byte)3); - - PlatformAffinityFunction f0 = (PlatformAffinityFunction)f; - out.writeObject(f0.getUserFunc()); - out.writeInt(f.partitions()); - } - else { - out.writeByte((byte)0); + out.writeByte((byte) 0); // override flags + out.writeObject(null); // user func + } else if (f instanceof PlatformAffinityFunction) { + PlatformAffinityFunction f0 = (PlatformAffinityFunction) f; + AffinityFunction baseFunc = f0.getBaseFunc(); + + if (baseFunc instanceof FairAffinityFunction) { + out.writeByte((byte) 1); + out.writeInt(f0.partitions()); + out.writeBoolean(((FairAffinityFunction) baseFunc).isExcludeNeighbors()); + out.writeByte(f0.getOverrideFlags()); + out.writeObject(f0.getUserFunc()); + } else if (baseFunc instanceof RendezvousAffinityFunction) { + out.writeByte((byte) 2); + out.writeInt(f0.partitions()); + out.writeBoolean(((RendezvousAffinityFunction) baseFunc).isExcludeNeighbors()); + out.writeByte(f0.getOverrideFlags()); + out.writeObject(f0.getUserFunc()); + } else { + out.writeByte((byte) 3); + out.writeInt(f0.partitions()); + out.writeBoolean(false); // exclude neighbors + out.writeByte(f0.getOverrideFlags()); + out.writeObject(f0.getUserFunc()); + } + } else { + out.writeByte((byte) 0); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java index 6642693..483fd22 100644 --- a/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java @@ -48,25 +48,13 @@ public class PlatformDotNetAffinityFunction implements AffinityFunction, Externa private static final long serialVersionUID = 0L; /** .NET type name. */ - private String typName; + private transient String typName; /** Properties. */ - private Map props; - - /** - * Partition count. - * - * 1) Java calls partitions() method very early (before LifecycleAware.start) during CacheConfiguration validation. - * 2) Partition count never changes. - * Therefore, we get the value on .NET side once, and pass it along with PlatformAffinity. - */ - private int partitions; + private transient Map props; /** Inner function. */ - private transient PlatformAffinityFunction func; - - /** Ignite. */ - private transient Ignite ignite; + private PlatformAffinityFunction func; /** * Gets .NET type name. @@ -113,7 +101,9 @@ public class PlatformDotNetAffinityFunction implements AffinityFunction, Externa /** {@inheritDoc} */ @Override public int partitions() { - return partitions; + assert func != null; + + return func.partitions(); } /** {@inheritDoc} */ @@ -137,66 +127,48 @@ public class PlatformDotNetAffinityFunction implements AffinityFunction, Externa func.removeNode(nodeId); } - /** - * Writes this func to the writer. - * - * @param writer Writer. - */ - public void write(BinaryRawWriter writer) { - assert writer != null; - - writer.writeObject(typName); - writer.writeMap(props); - } - /** {@inheritDoc} */ @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(typName); - out.writeObject(props); - out.writeInt(partitions); + out.writeObject(func); } /** {@inheritDoc} */ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - typName = (String)in.readObject(); - props = (Map)in.readObject(); - partitions = in.readInt(); + func = (PlatformAffinityFunction) in.readObject(); } /** - * Initializes the partitions count. + * Initializes this instance. * - * @param partitions Number of partitions. + * @param func Underlying func. */ - public void initPartitions(int partitions) { - this.partitions = partitions; + public void init(PlatformAffinityFunction func) { + assert func != null; + + this.func = func; } /** {@inheritDoc} */ @Override public void start() throws IgniteException { - assert ignite != null; - - PlatformContext ctx = PlatformUtils.platformContext(ignite); - assert ctx != null; - - try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformOutputStream out = mem.output(); - BinaryRawWriterEx writer = ctx.writer(out); - - write(writer); - - out.synchronize(); - - long ptr = ctx.gateway().affinityFunctionInit(mem.pointer()); + assert func != null; - func = new PlatformAffinityFunction(ctx, ptr, partitions); - } + func.start(); } /** {@inheritDoc} */ @Override public void stop() throws IgniteException { - if (func != null) - func.stop(); + assert func != null; + + func.stop(); + } + + /** + * Gets the inner func. + * + * @return The inner func. + */ + public PlatformAffinityFunction getFunc() { + return func; } /** @@ -204,8 +176,11 @@ public class PlatformDotNetAffinityFunction implements AffinityFunction, Externa * * @param ignite Ignite. */ + @SuppressWarnings("unused") @IgniteInstanceResource private void setIgnite(Ignite ignite) { - this.ignite = ignite; + assert func != null; + + func.setIgnite(ignite); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/cpp/jni/include/ignite/jni/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h index 98779c4..13e6e8d 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/java.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h @@ -100,7 +100,7 @@ namespace ignite typedef void(JNICALL *OnClientDisconnectedHandler)(void* target); typedef void(JNICALL *OnClientReconnectedHandler)(void* target, unsigned char clusterRestarted); - typedef long long(JNICALL *AffinityFunctionInitHandler)(void* target, long long memPtr); + typedef long long(JNICALL *AffinityFunctionInitHandler)(void* target, long long memPtr, void* baseFunc); typedef int(JNICALL *AffinityFunctionPartitionHandler)(void* target, long long ptr, long long memPtr); typedef void(JNICALL *AffinityFunctionAssignPartitionsHandler)(void* target, long long ptr, long long inMemPtr, long long outMemPtr); typedef void(JNICALL *AffinityFunctionRemoveNodeHandler)(void* target, long long ptr, long long memPtr); @@ -752,7 +752,7 @@ namespace ignite JNIEXPORT void JNICALL JniOnClientDisconnected(JNIEnv *env, jclass cls, jlong envPtr); JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted); - JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr); + JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr, jobject baseFunc); JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr); JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr); JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr); http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/cpp/jni/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp index 577ee26..87f1e03 100644 --- a/modules/platforms/cpp/jni/src/java.cpp +++ b/modules/platforms/cpp/jni/src/java.cpp @@ -363,7 +363,7 @@ namespace ignite JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED = JniMethod("onClientDisconnected", "(J)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED = JniMethod("onClientReconnected", "(JZ)V", true); - JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJ)J", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget;)J", true); JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true); JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true); @@ -2847,8 +2847,9 @@ namespace ignite IGNITE_SAFE_PROC(env, envPtr, OnClientReconnectedHandler, onClientReconnected, clusterRestarted); } - JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr) { - IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionInitHandler, affinityFunctionInit, memPtr); + JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr, jobject baseFunc) { + void* baseFuncRef = baseFunc ? env->NewGlobalRef(baseFunc) : nullptr; + IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionInitHandler, affinityFunctionInit, memPtr, baseFuncRef); } JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) { http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs index 33c0ce1..a1e2ad8 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs @@ -24,6 +24,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity using System.Linq; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Cache.Affinity.Fair; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Resource; using NUnit.Framework; @@ -39,7 +40,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity /// /// Initializes a new instance of the class. /// - public AffinityFunctionSpringTest() : base(3, "config\\cache\\affinity\\affinity-function.xml") + public AffinityFunctionSpringTest() : base(6, "config\\cache\\affinity\\affinity-function.xml") { // No-op. } @@ -65,6 +66,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity { ValidateAffinityFunction(Grid.GetCache("cache1")); ValidateAffinityFunction(_ignite.GetCache("cache1")); + ValidateAffinityFunction(Grid.GetCache("cache2")); + ValidateAffinityFunction(_ignite.GetCache("cache2")); } /// @@ -78,6 +81,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity ValidateAffinityFunction(_ignite.CreateCache("dyn-cache-2")); ValidateAffinityFunction(Grid.GetCache("dyn-cache-2")); + + ValidateAffinityFunction(Grid.CreateCache("dyn-cache2-1")); + ValidateAffinityFunction(_ignite.GetCache("dyn-cache2-1")); + + ValidateAffinityFunction(_ignite.CreateCache("dyn-cache2-2")); + ValidateAffinityFunction(Grid.GetCache("dyn-cache2-2")); } /// @@ -86,23 +95,32 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity /// The cache. private static void ValidateAffinityFunction(ICache cache) { - Assert.IsNull(cache.GetConfiguration().AffinityFunction); - var aff = cache.Ignite.GetAffinity(cache.Name); Assert.AreEqual(5, aff.Partitions); Assert.AreEqual(4, aff.GetPartition(2)); Assert.AreEqual(3, aff.GetPartition(4)); + + var func = (ITestFunc) cache.GetConfiguration().AffinityFunction; + Assert.AreEqual(5, func.Partitions); + Assert.AreEqual(1, func.Property1); + Assert.AreEqual("1", func.Property2); + } + + private interface ITestFunc : IAffinityFunction + { + int Property1 { get; set; } + + string Property2 { get; set; } } - [Serializable] - private class TestFunc : IAffinityFunction + private class TestFunc : ITestFunc // [Serializable] is not necessary { [InstanceResource] private readonly IIgnite _ignite = null; - private int Property1 { get; set; } + public int Property1 { get; set; } - private string Property2 { get; set; } + public string Property2 { get; set; } public int Partitions { @@ -128,5 +146,37 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity return Enumerable.Range(0, Partitions).Select(x => context.CurrentTopologySnapshot); } } + + private class TestFairFunc : FairAffinityFunction, ITestFunc // [Serializable] is not necessary + { + [InstanceResource] + private readonly IIgnite _ignite = null; + + public int Property1 { get; set; } + + public string Property2 { get; set; } + + public override int GetPartition(object key) + { + Assert.IsNotNull(_ignite); + Assert.AreEqual(1, Property1); + Assert.AreEqual("1", Property2); + + var basePart = base.GetPartition(key); + Assert.Greater(basePart, -1); + Assert.Less(basePart, Partitions); + + return (int) key * 2 % 5; + } + + public override IEnumerable> AssignPartitions(AffinityFunctionContext context) + { + var baseRes = base.AssignPartitions(context).ToList(); // test base call + + Assert.AreEqual(Partitions, baseRes.Count); + + return baseRes; + } + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs index ed0a95b..f38cb3e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs @@ -21,9 +21,11 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; + using System.Threading; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cache.Affinity; using Apache.Ignite.Core.Cache.Affinity.Fair; + using Apache.Ignite.Core.Cache.Affinity.Rendezvous; using Apache.Ignite.Core.Cache.Configuration; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Common; @@ -45,6 +47,12 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity private const string CacheName = "cache"; /** */ + private const string CacheNameFair = "cacheFair"; + + /** */ + private const string CacheNameRendezvous = "cacheRendezvous"; + + /** */ private const int PartitionCount = 10; /** */ @@ -68,6 +76,14 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity { AffinityFunction = new SimpleAffinityFunction(), Backups = 7 + }, + new CacheConfiguration(CacheNameFair) + { + AffinityFunction = new FairAffinityFunctionEx {Foo = 25} + }, + new CacheConfiguration(CacheNameRendezvous) + { + AffinityFunction = new RendezvousAffinityFunctionEx {Bar = "test"} } } }; @@ -84,8 +100,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity public void FixtureTearDown() { // Check that affinity handles are present - TestUtils.AssertHandleRegistryHasItems(_ignite, _ignite.GetCacheNames().Count, 0); - TestUtils.AssertHandleRegistryHasItems(_ignite2, _ignite.GetCacheNames().Count, 0); + TestUtils.AssertHandleRegistryHasItems(_ignite, _ignite.GetCacheNames().Count - 1, 0); + TestUtils.AssertHandleRegistryHasItems(_ignite2, _ignite.GetCacheNames().Count - 1, 0); // Destroy all caches _ignite.GetCacheNames().ToList().ForEach(_ignite.DestroyCache); @@ -184,6 +200,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity // Called on both nodes TestUtils.WaitForCondition(() => RemovedNodes.Count > 0, 3000); + Assert.GreaterOrEqual(RemovedNodes.Count, 6); Assert.AreEqual(expectedNodeId, RemovedNodes.Distinct().Single()); } @@ -218,22 +235,94 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity } /// - /// Tests user-defined function that inherits predefined function. + /// Tests customized fair affinity. /// [Test] - public void TestInheritPredefinedFunction() + public void TestInheritFairAffinity() { - var ex = Assert.Throws(() => - _ignite.CreateCache( - new CacheConfiguration("failCache3") - { - AffinityFunction = new FairAffinityFunctionInheritor() - })); + Assert.Greater(FairAffinityFunctionEx.AssignCount, 2); + + var caches = new[] + { + _ignite.GetCache(CacheNameFair), + _ignite.CreateCache(new CacheConfiguration(CacheNameFair + "2") + { + AffinityFunction = new FairAffinityFunctionEx {Foo = 25} + }) + }; + + foreach (var cache in caches) + { + var aff = _ignite.GetAffinity(cache.Name); + + Assert.AreEqual(PartitionCount, aff.Partitions); + + // Test from map + Assert.AreEqual(2, aff.GetPartition(1)); + Assert.AreEqual(3, aff.GetPartition(2)); + + // Test from base func + Assert.AreEqual(6, aff.GetPartition(33)); + + // Check config + var func = (FairAffinityFunctionEx) cache.GetConfiguration().AffinityFunction; + Assert.AreEqual(25, func.Foo); + } + } + + /// + /// Tests customized rendezvous affinity. + /// + [Test] + public void TestInheritRendezvousAffinity() + { + Assert.Greater(RendezvousAffinityFunctionEx.AssignCount, 2); - Assert.AreEqual("User-defined AffinityFunction can not inherit from " + - "Apache.Ignite.Core.Cache.Affinity.AffinityFunctionBase: " + - "Apache.Ignite.Core.Tests.Cache.Affinity.AffinityFunctionTest" + - "+FairAffinityFunctionInheritor", ex.Message); + var caches = new[] + { + _ignite.GetCache(CacheNameRendezvous), + _ignite.CreateCache(new CacheConfiguration(CacheNameRendezvous + "2") + { + AffinityFunction = new RendezvousAffinityFunctionEx {Bar = "test"} + }) + }; + + foreach (var cache in caches) + { + var aff = _ignite.GetAffinity(cache.Name); + + Assert.AreEqual(PartitionCount, aff.Partitions); + + // Test from map + Assert.AreEqual(3, aff.GetPartition(1)); + Assert.AreEqual(4, aff.GetPartition(2)); + + // Test from base func + Assert.AreEqual(2, aff.GetPartition(42)); + + // Check config + var func = (RendezvousAffinityFunctionEx)cache.GetConfiguration().AffinityFunction; + Assert.AreEqual("test", func.Bar); + } + } + + /// + /// Tests the AffinityFunction with simple inheritance: none of the methods are overridden, + /// so there are no callbacks, and user object is not passed over the wire. + /// + [Test] + public void TestSimpleInheritance() + { + var cache = _ignite.CreateCache(new CacheConfiguration("simpleInherit") + { + AffinityFunction = new SimpleOverride() + }); + + var aff = _ignite.GetAffinity(cache.Name); + + Assert.AreEqual(PartitionCount, aff.Partitions); + Assert.AreEqual(6, aff.GetPartition(33)); + Assert.AreEqual(4, aff.GetPartition(34)); } [Serializable] @@ -300,9 +389,97 @@ namespace Apache.Ignite.Core.Tests.Cache.Affinity } [Serializable] - private class FairAffinityFunctionInheritor : FairAffinityFunction + private class FairAffinityFunctionEx : FairAffinityFunction { - // No-op. + public static int AssignCount; + + private static readonly Dictionary PartitionMap = new Dictionary {{1, 2}, {2, 3}}; + + public override int Partitions + { + get { return PartitionCount; } + set { Assert.AreEqual(Partitions, value); } + } + + public int Foo { get; set; } + + public override int GetPartition(object key) + { + int res; + + if (PartitionMap.TryGetValue((int)key, out res)) + return res; + + return base.GetPartition(key); + } + + public override void RemoveNode(Guid nodeId) + { + RemovedNodes.Add(nodeId); + } + + public override IEnumerable> AssignPartitions(AffinityFunctionContext context) + { + var res = base.AssignPartitions(context).Reverse(); + + Interlocked.Increment(ref AssignCount); + + return res; + } + } + + [Serializable] + private class RendezvousAffinityFunctionEx : RendezvousAffinityFunction + { + public static int AssignCount; + + private static readonly Dictionary PartitionMap = new Dictionary {{1, 3}, {2, 4}}; + + public override int Partitions + { + get { return PartitionCount; } + set { Assert.AreEqual(Partitions, value); } + } + + public string Bar { get; set; } + + public override int GetPartition(object key) + { + int res; + + if (PartitionMap.TryGetValue((int)key, out res)) + return res; + + return base.GetPartition(key); + } + + public override void RemoveNode(Guid nodeId) + { + RemovedNodes.Add(nodeId); + } + + public override IEnumerable> AssignPartitions(AffinityFunctionContext context) + { + var res = base.AssignPartitions(context).Reverse(); + + Interlocked.Increment(ref AssignCount); + + return res; + } + } + + /// + /// Override only properties, so this func won't be passed over the wire. + /// + private class SimpleOverride : FairAffinityFunction + { + public override int Partitions + { + get { return PartitionCount; } + set { throw new NotSupportedException(); } + } + + public override bool ExcludeNeighbors { get; set; } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml index e7fc516..0be07f6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml @@ -64,6 +64,46 @@ + + + + + + + + + + + 1 + + + + 5 + + + + + + + + + + + + + + + + 1 + + + + 5 + + + + + http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs index 8f11122..149fa35 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs @@ -48,9 +48,9 @@ namespace Apache.Ignite.Core.Tests return; } - //TestOne(typeof(BinaryStringTest), "Test"); + TestOne(typeof(AffinityFunctionTest), "TestSimpleInheritance"); - TestAll(typeof (AffinityFunctionSpringTest)); + //TestAll(typeof (AffinityFunctionTest)); //TestAllInAssembly(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index e7f772f..57f89a6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -164,6 +164,9 @@ + + + http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs index 3434384..ce2e5e1 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs @@ -20,37 +20,34 @@ namespace Apache.Ignite.Core.Cache.Affinity using System; using System.Collections.Generic; using System.ComponentModel; - using Apache.Ignite.Core.Binary; - using Apache.Ignite.Core.Cache.Affinity.Fair; - using Apache.Ignite.Core.Cache.Affinity.Rendezvous; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Common; /// /// Base class for predefined affinity functions. /// + [Serializable] public abstract class AffinityFunctionBase : IAffinityFunction { - /** */ - private const byte TypeCodeNull = 0; - - /** */ - private const byte TypeCodeFair = 1; + /// The default value for property. + public const int DefaultPartitions = 1024; /** */ - private const byte TypeCodeRendezvous = 2; + private int _partitions = DefaultPartitions; /** */ - private const byte TypeCodeUser = 3; + private IAffinityFunction _baseFunction; - /// The default value for property. - public const int DefaultPartitions = 1024; /// /// Gets or sets the total number of partitions. /// [DefaultValue(DefaultPartitions)] - public int Partitions { get; set; } + public virtual int Partitions + { + get { return _partitions; } + set { _partitions = value; } + } /// /// Gets partition number for a given key starting from 0. Partitioned caches @@ -67,9 +64,11 @@ namespace Apache.Ignite.Core.Cache.Affinity /// /// Partition number for a given key. /// - public int GetPartition(object key) + public virtual int GetPartition(object key) { - throw GetDirectUsageError(); + ThrowIfUninitialized(); + + return _baseFunction.GetPartition(key); } /// @@ -77,9 +76,11 @@ namespace Apache.Ignite.Core.Cache.Affinity /// disconnected node from affinity mapping. /// /// The node identifier. - public void RemoveNode(Guid nodeId) + public virtual void RemoveNode(Guid nodeId) { - throw GetDirectUsageError(); + ThrowIfUninitialized(); + + _baseFunction.RemoveNode(nodeId); } /// @@ -97,107 +98,42 @@ namespace Apache.Ignite.Core.Cache.Affinity /// A collection of partitions, where each partition is a collection of nodes, /// where first node is a primary node, and other nodes are backup nodes. /// - public IEnumerable> AssignPartitions(AffinityFunctionContext context) + public virtual IEnumerable> AssignPartitions(AffinityFunctionContext context) { - throw GetDirectUsageError(); + ThrowIfUninitialized(); + + return _baseFunction.AssignPartitions(context); } /// /// Gets or sets a value indicating whether to exclude same-host-neighbors from being backups of each other. /// - public bool ExcludeNeighbors { get; set; } + public virtual bool ExcludeNeighbors { get; set; } /// /// Initializes a new instance of the class. /// internal AffinityFunctionBase() { - Partitions = DefaultPartitions; + // No-op. } /// - /// Reads the instance. + /// Sets the base function. /// - internal static IAffinityFunction Read(IBinaryRawReader reader) + /// The base function. + internal void SetBaseFunction(IAffinityFunction baseFunc) { - AffinityFunctionBase fun; - - var typeCode = reader.ReadByte(); - switch (typeCode) - { - case TypeCodeNull: - return null; - case TypeCodeFair: - fun = new FairAffinityFunction(); - break; - case TypeCodeRendezvous: - fun = new RendezvousAffinityFunction(); - break; - case TypeCodeUser: - var f = reader.ReadObject(); - reader.ReadInt(); // skip partition count - - return f; - default: - throw new InvalidOperationException("Invalid AffinityFunction type code: " + typeCode); - } - - fun.Partitions = reader.ReadInt(); - fun.ExcludeNeighbors = reader.ReadBoolean(); - - return fun; - } - - /// - /// Writes the instance. - /// - internal static void Write(IBinaryRawWriter writer, IAffinityFunction fun) - { - if (fun == null) - { - writer.WriteByte(TypeCodeNull); - return; - } - - var p = fun as AffinityFunctionBase; - - if (p != null) - { - ValidateAffinityFunctionType(p.GetType()); - writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous); - writer.WriteInt(p.Partitions); - writer.WriteBoolean(p.ExcludeNeighbors); - } - else - { - writer.WriteByte(TypeCodeUser); - - if (!fun.GetType().IsSerializable) - throw new IgniteException("AffinityFunction should be serializable."); - - writer.WriteObject(fun); - writer.WriteInt(fun.Partitions); // partition count is written once and can not be changed. - } - } - - /// - /// Validates the type of the affinity function. - /// - private static void ValidateAffinityFunctionType(Type funcType) - { - if (funcType == typeof(FairAffinityFunction) || funcType == typeof(RendezvousAffinityFunction)) - return; - - throw new IgniteException(string.Format("User-defined AffinityFunction can not inherit from {0}: {1}", - typeof(AffinityFunctionBase), funcType)); + _baseFunction = baseFunc; } /// /// Gets the direct usage error. /// - private Exception GetDirectUsageError() + private void ThrowIfUninitialized() { - return new IgniteException(GetType() + " can not be used directly."); + if (_baseFunction == null) + throw new IgniteException(GetType() + " has not yet been initialized."); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs index 1f44d8c..6067af4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs @@ -18,6 +18,7 @@ namespace Apache.Ignite.Core.Cache.Affinity { using System.Collections.Generic; + using System.Diagnostics; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Events; @@ -49,6 +50,14 @@ namespace Apache.Ignite.Core.Cache.Affinity /// The reader. internal AffinityFunctionContext(IBinaryRawReader reader) { + Debug.Assert(reader != null); + + _currentTopologySnapshot = IgniteUtils.ReadNodes(reader); + _backups = reader.ReadInt(); + _currentTopologyVersion = new AffinityTopologyVersion(reader.ReadLong(), reader.ReadInt()); + _discoveryEvent = EventReader.Read(reader); + + // Prev assignment var cnt = reader.ReadInt(); if (cnt > 0) @@ -58,11 +67,6 @@ namespace Apache.Ignite.Core.Cache.Affinity for (var i = 0; i < cnt; i++) _previousAssignment.Add(IgniteUtils.ReadNodes(reader)); } - - _backups = reader.ReadInt(); - _currentTopologySnapshot = IgniteUtils.ReadNodes(reader); - _currentTopologyVersion = new AffinityTopologyVersion(reader.ReadLong(), reader.ReadInt()); - _discoveryEvent = EventReader.Read(reader); } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs index 66fb4bc..4a3885f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs @@ -17,12 +17,16 @@ namespace Apache.Ignite.Core.Cache.Affinity.Fair { + using System; + /// /// Fair affinity function which tries to ensure that all nodes get equal number of partitions with /// minimum amount of reassignments between existing nodes. /// + [Serializable] public class FairAffinityFunction : AffinityFunctionBase { // No-op. + // Actual implementation is in Java, see AffinityFunctionSerializer.Write method. } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs index edc6af0..98ec364 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Rendezvous/RendezvousAffinityFunction.cs @@ -17,11 +17,15 @@ namespace Apache.Ignite.Core.Cache.Affinity.Rendezvous { + using System; + /// /// Affinity function for partitioned cache based on Highest Random Weight algorithm. /// + [Serializable] public class RendezvousAffinityFunction : AffinityFunctionBase { // No-op. + // Actual implementation is in Java, see AffinityFunctionSerializer.Write method. } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs index 54f4753..e5e79cd 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Configuration/CacheConfiguration.cs @@ -34,6 +34,7 @@ namespace Apache.Ignite.Core.Cache.Configuration using Apache.Ignite.Core.Cache.Store; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Cache.Affinity; /// /// Defines grid cache configuration. @@ -275,7 +276,7 @@ namespace Apache.Ignite.Core.Cache.Configuration NearConfiguration = reader.ReadBoolean() ? new NearCacheConfiguration(reader) : null; EvictionPolicy = EvictionPolicyBase.Read(reader); - AffinityFunction = AffinityFunctionBase.Read(reader); + AffinityFunction = AffinityFunctionSerializer.Read(reader); } /// @@ -348,7 +349,7 @@ namespace Apache.Ignite.Core.Cache.Configuration writer.WriteBoolean(false); EvictionPolicyBase.Write(writer, EvictionPolicy); - AffinityFunctionBase.Write(writer, AffinityFunction); + AffinityFunctionSerializer.Write(writer, AffinityFunction); } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs index 48eeec2..c7e7eff 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs @@ -32,6 +32,7 @@ namespace Apache.Ignite.Core using Apache.Ignite.Core.Impl; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; + using Apache.Ignite.Core.Impl.Cache.Affinity; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Handle; using Apache.Ignite.Core.Impl.Memory; @@ -354,7 +355,10 @@ namespace Apache.Ignite.Core var writer = reader.Marshaller.StartMarshal(outStream); for (var i = 0; i < cnt; i++) - writer.WriteInt(CreateObject(reader).Partitions); + { + var objHolder = new ObjectInfoHolder(reader); + AffinityFunctionSerializer.Write(writer, objHolder.CreateInstance(), objHolder); + } } /// http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs index 87de0eb..7f949d0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryReaderExtensions.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Impl.Binary using System; using System.Collections.Generic; using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Impl.Common; /// /// Reader extensions. @@ -91,5 +92,18 @@ namespace Apache.Ignite.Core.Impl.Binary { return reader.ReadBoolean() ? reader.ReadBoolean() : (bool?) null; } + + /// + /// Reads the object either as a normal object or as a [typeName+props] wrapper. + /// + public static T ReadObjectEx(this IBinaryRawReader reader) + { + var obj = reader.ReadObject(); + + if (obj == null) + return default(T); + + return obj is T ? (T) obj : ((ObjectInfoHolder) obj).CreateInstance(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/11d97f17/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs index 5836b48..d222c2e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/Marshaller.cs @@ -575,6 +575,7 @@ namespace Apache.Ignite.Core.Impl.Binary AddSystemType(BinaryUtils.TypeStreamReceiverHolder, w => new StreamReceiverHolder(w)); AddSystemType(0, w => new AffinityKey(w), "affKey"); AddSystemType(BinaryUtils.TypePlatformJavaObjectFactoryProxy, w => new PlatformJavaObjectFactoryProxy()); + AddSystemType(0, w => new ObjectInfoHolder(w)); } } }