Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 62812200B43 for ; Tue, 28 Jun 2016 15:07:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 61235160A56; Tue, 28 Jun 2016 13:07:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 278DB160A6F for ; Tue, 28 Jun 2016 15:06:56 +0200 (CEST) Received: (qmail 16159 invoked by uid 500); 28 Jun 2016 13:06:55 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 14180 invoked by uid 99); 28 Jun 2016 13:06:51 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jun 2016 13:06:51 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0198AEE687; Tue, 28 Jun 2016 13:06:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Tue, 28 Jun 2016 13:07:17 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [30/50] [abbrv] ignite git commit: IGNITE-3328 .NET: Support user-defined AffinityFunction archived-at: Tue, 28 Jun 2016 13:07:00 -0000 IGNITE-3328 .NET: Support user-defined AffinityFunction This closes #826 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1c755c7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1c755c7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1c755c7 Branch: refs/heads/ignite-1232 Commit: e1c755c7ec1e8e7d6fc60b722c32d25bf188cd6b Parents: 6cfd991 Author: Pavel Tupitsyn Authored: Fri Jun 24 18:57:07 2016 +0300 Committer: Pavel Tupitsyn Committed: Fri Jun 24 18:57:07 2016 +0300 ---------------------------------------------------------------------- .../GridAffinityFunctionContextImpl.java | 9 + .../affinity/PlatformAffinityFunction.java | 242 ++++++++++++++++ .../callback/PlatformCallbackGateway.java | 87 ++++++ .../callback/PlatformCallbackUtils.java | 46 +++ .../utils/PlatformConfigurationUtils.java | 13 +- .../platforms/cpp/jni/include/ignite/jni/java.h | 19 +- modules/platforms/cpp/jni/src/java.cpp | 35 ++- .../Apache.Ignite.Core.Tests.csproj | 5 +- .../Cache/Affinity/AffinityFieldTest.cs | 199 +++++++++++++ .../Cache/Affinity/AffinityFunctionTest.cs | 282 +++++++++++++++++++ .../Cache/Affinity/AffinityTest.cs | 138 +++++++++ .../Cache/CacheAffinityFieldTest.cs | 199 ------------- .../Cache/CacheAffinityTest.cs | 139 --------- .../Cache/CacheConfigurationTest.cs | 6 +- .../native-client-test-cache-affinity.xml | 2 +- .../IgniteConfigurationSerializerTest.cs | 14 +- .../Apache.Ignite.Core.Tests/TestRunner.cs | 6 +- .../Apache.Ignite.Core.csproj | 2 + .../Cache/Affinity/AffinityFunctionBase.cs | 102 ++++++- .../Cache/Affinity/AffinityFunctionContext.cs | 116 ++++++++ .../Cache/Affinity/AffinityTopologyVersion.cs | 138 +++++++++ .../Cache/Affinity/IAffinityFunction.cs | 55 +++- .../Cache/Configuration/CacheConfiguration.cs | 9 +- .../Apache.Ignite.Core/Events/EventBase.cs | 2 +- .../Apache.Ignite.Core/Events/EventReader.cs | 8 +- .../Apache.Ignite.Core/IgniteConfiguration.cs | 1 + .../IgniteConfigurationSection.xsd | 4 +- .../dotnet/Apache.Ignite.Core/Ignition.cs | 4 +- .../Impl/Unmanaged/UnmanagedCallbackHandlers.cs | 6 + .../Impl/Unmanaged/UnmanagedCallbacks.cs | 120 +++++++- .../dotnet/Apache.Ignite.sln.DotSettings | 7 +- 31 files changed, 1629 insertions(+), 386 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java index 6c97efd..e2bb99d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java @@ -80,4 +80,13 @@ public class GridAffinityFunctionContextImpl implements AffinityFunctionContext @Override public int backups() { return backups; } + + /** + * Gets the previous assignment. + * + * @return Previous assignment + */ + public List> prevAssignment() { + return prevAssignment; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java new file mode 100644 index 0000000..4da5e24 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.cluster.IgniteClusterEx; +import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.resources.IgniteInstanceResource; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +/** + * Platform AffinityFunction. + */ +public class PlatformAffinityFunction implements AffinityFunction, Externalizable, LifecycleAware { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private Object userFunc; + + /** */ + private int partitions; + + /** */ + private transient Ignite ignite; + + /** */ + private transient PlatformContext ctx; + + /** */ + private transient long ptr; + + /** + * Ctor for serialization. + * + */ + public PlatformAffinityFunction() { + partitions = -1; + } + + /** + * Ctor. + * + * @param func User fun object. + * @param partitions Initial number of partitions. + */ + public PlatformAffinityFunction(Object func, int partitions) { + userFunc = func; + this.partitions = partitions; + } + + /** {@inheritDoc} */ + public Object getUserFunc() { + return userFunc; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op: userFunc is always in initial state (it is serialized only once on start). + } + + /** {@inheritDoc} */ + @Override public int partitions() { + // Affinity function can not return different number of partitions, + // so we pass this value once from the platform. + assert partitions > 0; + + return partitions; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + assert ctx != null; + assert ptr != 0; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeObject(key); + + out.synchronize(); + + return ctx.gateway().affinityFunctionPartition(ptr, mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public List> assignPartitions(AffinityFunctionContext affCtx) { + assert ctx != null; + assert ptr != 0; + assert affCtx != null; + + try (PlatformMemory outMem = ctx.memory().allocate()) { + try (PlatformMemory inMem = ctx.memory().allocate()) { + PlatformOutputStream out = outMem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + // Write previous assignment + List> prevAssignment = ((GridAffinityFunctionContextImpl)affCtx).prevAssignment(); + + if (prevAssignment == null) + writer.writeInt(-1); + else { + writer.writeInt(prevAssignment.size()); + + for (List part : prevAssignment) + ctx.writeNodes(writer, part); + } + + // Write other props + writer.writeInt(affCtx.backups()); + ctx.writeNodes(writer, affCtx.currentTopologySnapshot()); + writer.writeLong(affCtx.currentTopologyVersion().topologyVersion()); + writer.writeInt(affCtx.currentTopologyVersion().minorTopologyVersion()); + ctx.writeEvent(writer, affCtx.discoveryEvent()); + + // Call platform + out.synchronize(); + ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer()); + + PlatformInputStream in = inMem.input(); + BinaryRawReaderEx reader = ctx.reader(in); + + // Read result + int partCnt = in.readInt(); + List> res = new ArrayList<>(partCnt); + IgniteClusterEx cluster = ctx.kernalContext().grid().cluster(); + + for (int i = 0; i < partCnt; i++) { + int partSize = in.readInt(); + List part = new ArrayList<>(partSize); + + for (int j = 0; j < partSize; j++) + part.add(cluster.node(reader.readUuid())); + + res.add(part); + } + + return res; + } + } + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + assert ctx != null; + assert ptr != 0; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeUuid(nodeId); + + out.synchronize(); + + ctx.gateway().affinityFunctionRemoveNode(ptr, mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(userFunc); + out.writeInt(partitions); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + userFunc = in.readObject(); + partitions = in.readInt(); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + assert ignite != null; + ctx = PlatformUtils.platformContext(ignite); + assert ctx != null; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeObject(userFunc); + + out.synchronize(); + + ptr = ctx.gateway().affinityFunctionInit(mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + assert ctx != null; + + ctx.gateway().affinityFunctionDestroy(ptr); + } + + /** + * Injects the Ignite. + * + * @param ignite Ignite. + */ + @IgniteInstanceResource + private void setIgnite(Ignite ignite) { + this.ignite = ignite; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index 3439f38..3708e8f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -951,6 +951,93 @@ public class PlatformCallbackGateway { } /** + * Initializes affinity function. + * + * @param memPtr Pointer to a stream with serialized affinity function. + * @return Affinity function pointer. + */ + public long affinityFunctionInit(long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr); + } + finally { + leave(); + } + } + + /** + * Gets the partition from affinity function. + * + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with key object. + * @return Partition number for a given key. + */ + public int affinityFunctionPartition(long ptr, long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.affinityFunctionPartition(envPtr, ptr, memPtr); + } + finally { + leave(); + } + } + + /** + * Assigns the affinity partitions. + * + * @param ptr Affinity function pointer. + * @param outMemPtr Pointer to a stream with affinity context. + * @param inMemPtr Pointer to a stream with result. + */ + public void affinityFunctionAssignPartitions(long ptr, long outMemPtr, long inMemPtr){ + enter(); + + try { + PlatformCallbackUtils.affinityFunctionAssignPartitions(envPtr, ptr, outMemPtr, inMemPtr); + } + finally { + leave(); + } + } + + /** + * Removes the node from affinity function. + * + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with node id. + */ + public void affinityFunctionRemoveNode(long ptr, long memPtr) { + enter(); + + try { + PlatformCallbackUtils.affinityFunctionRemoveNode(envPtr, ptr, memPtr); + } + finally { + leave(); + } + } + + /** + * Destroys the affinity function. + * + * @param ptr Affinity function pointer. + */ + public void affinityFunctionDestroy(long ptr) { + if (!lock.enterBusy()) + return; // skip: destroy is not necessary during shutdown. + + try { + PlatformCallbackUtils.affinityFunctionDestroy(envPtr, ptr); + } + finally { + leave(); + } + } + + /** * Enter gateway. */ protected void enter() { http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java index f7d6586..d19782d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -496,6 +496,52 @@ public class PlatformCallbackUtils { static native void onClientReconnected(long envPtr, boolean clusterRestarted); /** + * Initializes affinity function. + * + * @param envPtr Environment pointer. + * @param memPtr Pointer to a stream with serialized affinity function. + * @return Affinity function pointer. + */ + static native long affinityFunctionInit(long envPtr, long memPtr); + + /** + * Gets the partition from affinity function. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with key object. + * @return Partition number for a given key. + */ + static native int affinityFunctionPartition(long envPtr, long ptr, long memPtr); + + /** + * Assigns the affinity partitions. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + * @param outMemPtr Pointer to a stream with affinity context. + * @param inMemPtr Pointer to a stream with result. + */ + static native void affinityFunctionAssignPartitions(long envPtr, long ptr, long outMemPtr, long inMemPtr); + + /** + * Removes the node from affinity function. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with node id. + */ + static native void affinityFunctionRemoveNode(long envPtr, long ptr, long memPtr); + + /** + * Destroys the affinity function. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + */ + static native void affinityFunctionDestroy(long envPtr, long ptr); + + /** * Private constructor. */ private PlatformCallbackUtils() { http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java index 29b6a70..7353f08 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java @@ -41,6 +41,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.internal.binary.*; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction; import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration; import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration; import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative; @@ -237,7 +238,7 @@ public class PlatformConfigurationUtils { * @param in Stream. * @return Affinity function. */ - private static AffinityFunction readAffinityFunction(BinaryRawReader in) { + private static AffinityFunction readAffinityFunction(BinaryRawReaderEx in) { byte plcTyp = in.readByte(); switch (plcTyp) { @@ -255,6 +256,9 @@ public class PlatformConfigurationUtils { f.setExcludeNeighbors(in.readBoolean()); return f; } + case 3: { + return new PlatformAffinityFunction(in.readObjectDetached(), in.readInt()); + } default: assert false; } @@ -296,6 +300,13 @@ public class PlatformConfigurationUtils { out.writeInt(f0.getPartitions()); out.writeBoolean(f0.isExcludeNeighbors()); } + else if (f instanceof PlatformAffinityFunction) { + out.writeByte((byte)3); + + PlatformAffinityFunction f0 = (PlatformAffinityFunction)f; + out.writeObject(f0.getUserFunc()); + out.writeInt(f.partitions()); + } else { out.writeByte((byte)0); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/cpp/jni/include/ignite/jni/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h index 3d45ec0..98779c4 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/java.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h @@ -21,7 +21,6 @@ #include #include "ignite/common/common.h" -#include "ignite/ignite_error.h" namespace ignite { @@ -101,6 +100,12 @@ namespace ignite typedef void(JNICALL *OnClientDisconnectedHandler)(void* target); typedef void(JNICALL *OnClientReconnectedHandler)(void* target, unsigned char clusterRestarted); + typedef long long(JNICALL *AffinityFunctionInitHandler)(void* target, long long memPtr); + typedef int(JNICALL *AffinityFunctionPartitionHandler)(void* target, long long ptr, long long memPtr); + typedef void(JNICALL *AffinityFunctionAssignPartitionsHandler)(void* target, long long ptr, long long inMemPtr, long long outMemPtr); + typedef void(JNICALL *AffinityFunctionRemoveNodeHandler)(void* target, long long ptr, long long memPtr); + typedef void(JNICALL *AffinityFunctionDestroyHandler)(void* target, long long ptr); + /** * JNI handlers holder. */ @@ -178,6 +183,12 @@ namespace ignite OnClientDisconnectedHandler onClientDisconnected; OnClientReconnectedHandler onClientReconnected; + + AffinityFunctionInitHandler affinityFunctionInit; + AffinityFunctionPartitionHandler affinityFunctionPartition; + AffinityFunctionAssignPartitionsHandler affinityFunctionAssignPartitions; + AffinityFunctionRemoveNodeHandler affinityFunctionRemoveNode; + AffinityFunctionDestroyHandler affinityFunctionDestroy; }; /** @@ -740,6 +751,12 @@ namespace ignite JNIEXPORT void JNICALL JniOnClientDisconnected(JNIEnv *env, jclass cls, jlong envPtr); JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted); + + JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr); + JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr); + JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr); + JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr); + JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/cpp/jni/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp index 66be0ca..577ee26 100644 --- a/modules/platforms/cpp/jni/src/java.cpp +++ b/modules/platforms/cpp/jni/src/java.cpp @@ -22,6 +22,7 @@ #include "ignite/jni/utils.h" #include "ignite/common/concurrent.h" #include "ignite/jni/java.h" +#include #define IGNITE_SAFE_PROC_NO_ARG(jniEnv, envPtr, type, field) { \ JniHandlers* hnds = reinterpret_cast(envPtr); \ @@ -362,6 +363,12 @@ namespace ignite JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED = JniMethod("onClientDisconnected", "(J)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED = JniMethod("onClientReconnected", "(JZ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJ)J", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY = JniMethod("affinityFunctionDestroy", "(JJ)V", true); + const char* C_PLATFORM_UTILS = "org/apache/ignite/internal/processors/platform/utils/PlatformUtils"; JniMethod M_PLATFORM_UTILS_REALLOC = JniMethod("reallocate", "(JI)V", true); JniMethod M_PLATFORM_UTILS_ERR_DATA = JniMethod("errorData", "(Ljava/lang/Throwable;)[B", true); @@ -821,7 +828,7 @@ namespace ignite void RegisterNatives(JNIEnv* env) { { - JNINativeMethod methods[54]; + JNINativeMethod methods[59]; int idx = 0; @@ -898,6 +905,12 @@ namespace ignite AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED, reinterpret_cast(JniOnClientDisconnected)); AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED, reinterpret_cast(JniOnClientReconnected)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT, reinterpret_cast(JniAffinityFunctionInit)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION, reinterpret_cast(JniAffinityFunctionPartition)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS, reinterpret_cast(JniAffinityFunctionAssignPartitions)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE, reinterpret_cast(JniAffinityFunctionRemoveNode)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY, reinterpret_cast(JniAffinityFunctionDestroy)); + jint res = env->RegisterNatives(FindClass(env, C_PLATFORM_CALLBACK_UTILS), methods, idx); if (res != JNI_OK) @@ -2833,6 +2846,26 @@ namespace ignite JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted) { IGNITE_SAFE_PROC(env, envPtr, OnClientReconnectedHandler, onClientReconnected, clusterRestarted); } + + JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr) { + IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionInitHandler, affinityFunctionInit, memPtr); + } + + JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) { + IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionPartitionHandler, affinityFunctionPartition, ptr, memPtr); + } + + JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr) { + IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionAssignPartitionsHandler, affinityFunctionAssignPartitions, ptr, inMemPtr, outMemPtr); + } + + JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) { + IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionRemoveNodeHandler, affinityFunctionRemoveNode, ptr, memPtr); + } + + JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr) { + IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionDestroyHandler, affinityFunctionDestroy, ptr); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index 1a367d4..15e46ae 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -59,13 +59,14 @@ - + + - + http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs new file mode 100644 index 0000000..ceb04cd --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ReSharper disable UnusedAutoPropertyAccessor.Local +namespace Apache.Ignite.Core.Tests.Cache.Affinity +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Cache.Configuration; + using Apache.Ignite.Core.Tests.Compute; + using NUnit.Framework; + + /// + /// Tests custom affinity mapping. + /// + public class AffinityFieldTest + { + /** */ + private ICache _cache1; + + /** */ + private ICache _cache2; + + /// + /// Fixture set up. + /// + [TestFixtureSetUp] + public void FixtureSetUp() + { + var grid1 = Ignition.Start(GetConfig()); + var grid2 = Ignition.Start(GetConfig("grid2")); + + _cache1 = grid1.CreateCache(new CacheConfiguration + { + CacheMode = CacheMode.Partitioned + }); + _cache2 = grid2.GetCache(null); + } + + /// + /// Fixture tear down. + /// + [TestFixtureTearDown] + public void FixtureTearDown() + { + Ignition.StopAll(true); + } + + /// + /// Tests the metadata. + /// + [Test] + public void TestMetadata() + { + // Put keys to update meta + _cache1.Put(new CacheKey(), string.Empty); + _cache1.Put(new CacheKeyAttr(), string.Empty); + _cache1.Put(new CacheKeyAttrOverride(), string.Empty); + + // Verify + foreach (var type in new[] { typeof(CacheKey) , typeof(CacheKeyAttr), typeof(CacheKeyAttrOverride)}) + { + Assert.AreEqual("AffinityKey", _cache1.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName); + Assert.AreEqual("AffinityKey", _cache2.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName); + } + } + + /// + /// Tests that keys are located properly in cache partitions. + /// + [Test] + public void TestKeyLocation() + { + TestKeyLocation0((key, affKey) => new CacheKey {Key = key, AffinityKey = affKey}); + TestKeyLocation0((key, affKey) => new CacheKeyAttr {Key = key, AffinityKey = affKey}); + TestKeyLocation0((key, affKey) => new CacheKeyAttrOverride {Key = key, AffinityKey = affKey}); + } + + /// + /// Tests the class. + /// + [Test] + public void TestAffinityKeyClass() + { + // Check location + TestKeyLocation0((key, affKey) => new AffinityKey(key, affKey)); + + // Check meta + Assert.AreEqual("affKey", + _cache1.Ignite.GetBinary().GetBinaryType(typeof (AffinityKey)).AffinityKeyFieldName); + } + + /// + /// Tests class interoperability. + /// + [Test] + public void TestInterop() + { + var affKey = _cache1.Ignite.GetCompute() + .ExecuteJavaTask(ComputeApiTest.EchoTask, ComputeApiTest.EchoTypeAffinityKey); + + Assert.AreEqual("interopAffinityKey", affKey.Key); + } + + /// + /// Tests the key location. + /// + private void TestKeyLocation0(Func ctor) + { + var aff = _cache1.Ignite.GetAffinity(_cache1.Name); + + foreach (var cache in new[] { _cache1, _cache2 }) + { + cache.RemoveAll(); + + var localNode = cache.Ignite.GetCluster().GetLocalNode(); + + var localKeys = Enumerable.Range(1, int.MaxValue) + .Where(x => aff.MapKeyToNode(x).Id == localNode.Id).Take(100).ToArray(); + + for (int index = 0; index < localKeys.Length; index++) + { + var cacheKey = ctor(index, localKeys[index]); + + cache.Put(cacheKey, index.ToString()); + + // Verify that key is stored locally according to AffinityKeyFieldName + Assert.AreEqual(index.ToString(), cache.LocalPeek(cacheKey, CachePeekMode.Primary)); + + // Other cache does not have this key locally + var otherCache = cache == _cache1 ? _cache2 : _cache1; + Assert.Throws(() => otherCache.LocalPeek(cacheKey, CachePeekMode.All)); + } + } + } + + /// + /// Gets the configuration. + /// + private static IgniteConfiguration GetConfig(string gridName = null) + { + return new IgniteConfiguration(TestUtils.GetTestConfiguration()) + { + GridName = gridName, + BinaryConfiguration = new BinaryConfiguration + { + TypeConfigurations = new[] + { + new BinaryTypeConfiguration(typeof (CacheKey)) + { + AffinityKeyFieldName = "AffinityKey" + }, + new BinaryTypeConfiguration(typeof(CacheKeyAttr)), + new BinaryTypeConfiguration(typeof (CacheKeyAttrOverride)) + { + AffinityKeyFieldName = "AffinityKey" + } + } + }, + }; + } + + private class CacheKey + { + public int Key { get; set; } + public int AffinityKey { get; set; } + } + + private class CacheKeyAttr + { + public int Key { get; set; } + [AffinityKeyMapped] public int AffinityKey { get; set; } + } + + private class CacheKeyAttrOverride + { + [AffinityKeyMapped] public int Key { get; set; } + public int AffinityKey { get; set; } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs new file mode 100644 index 0000000..70e0d78 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Affinity +{ + using System; + using System.Collections.Concurrent; + using System.Collections.Generic; + using System.Linq; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Cache.Configuration; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; + using Apache.Ignite.Core.Resource; + using NUnit.Framework; + + /// + /// Tests user-defined + /// + public class AffinityFunctionTest + { + /** */ + private IIgnite _ignite; + + /** */ + private IIgnite _ignite2; + + /** */ + private const string CacheName = "cache"; + + /** */ + private const int PartitionCount = 10; + + /** */ + private static readonly ConcurrentBag RemovedNodes = new ConcurrentBag(); + + /** */ + private static readonly ConcurrentBag Contexts = + new ConcurrentBag(); + + /// + /// Fixture set up. + /// + [TestFixtureSetUp] + public void FixtureSetUp() + { + var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration()) + { + CacheConfiguration = new[] + { + new CacheConfiguration(CacheName) + { + AffinityFunction = new SimpleAffinityFunction(), + Backups = 7 + } + } + }; + + _ignite = Ignition.Start(cfg); + + _ignite2 = Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration()) {GridName = "grid2"}); + } + + /// + /// Fixture tear down. + /// + [TestFixtureTearDown] + public void FixtureTearDown() + { + // Check that affinity handles are present + TestUtils.AssertHandleRegistryHasItems(_ignite, _ignite.GetCacheNames().Count, 0); + TestUtils.AssertHandleRegistryHasItems(_ignite2, _ignite.GetCacheNames().Count, 0); + + // Destroy all caches + _ignite.GetCacheNames().ToList().ForEach(_ignite.DestroyCache); + Assert.AreEqual(0, _ignite.GetCacheNames().Count); + + // Check that all affinity functions got released + TestUtils.AssertHandleRegistryIsEmpty(1000, _ignite, _ignite2); + + Ignition.StopAll(true); + } + + /// + /// Tests the static cache. + /// + [Test] + public void TestStaticCache() + { + VerifyCacheAffinity(_ignite.GetCache(CacheName)); + VerifyCacheAffinity(_ignite2.GetCache(CacheName)); + } + + /// + /// Tests the dynamic cache. + /// + [Test] + public void TestDynamicCache() + { + const string cacheName = "dynCache"; + + VerifyCacheAffinity(_ignite.CreateCache(new CacheConfiguration(cacheName) + { + AffinityFunction = new SimpleAffinityFunction(), + Backups = 5 + })); + + VerifyCacheAffinity(_ignite2.GetCache(cacheName)); + + // Verify context for new cache + var lastCtx = Contexts.Where(x => x.GetPreviousAssignment(1) == null) + .OrderBy(x => x.DiscoveryEvent.Timestamp).Last(); + + Assert.AreEqual(new AffinityTopologyVersion(2, 1), lastCtx.CurrentTopologyVersion); + Assert.AreEqual(5, lastCtx.Backups); + + // Verify context for old cache + var ctx = Contexts.Where(x => x.GetPreviousAssignment(1) != null) + .OrderBy(x => x.DiscoveryEvent.Timestamp).Last(); + + Assert.AreEqual(new AffinityTopologyVersion(2, 0), ctx.CurrentTopologyVersion); + Assert.AreEqual(7, ctx.Backups); + CollectionAssert.AreEquivalent(_ignite.GetCluster().GetNodes(), ctx.CurrentTopologySnapshot); + + var evt = ctx.DiscoveryEvent; + CollectionAssert.AreEquivalent(_ignite.GetCluster().GetNodes(), evt.TopologyNodes); + CollectionAssert.Contains(_ignite.GetCluster().GetNodes(), evt.EventNode); + Assert.AreEqual(_ignite.GetCluster().TopologyVersion, evt.TopologyVersion); + + var firstTop = _ignite.GetCluster().GetTopology(1); + var parts = Enumerable.Range(0, PartitionCount).ToArray(); + CollectionAssert.AreEqual(parts.Select(x => firstTop), parts.Select(x => ctx.GetPreviousAssignment(x))); + } + + /// + /// Verifies the cache affinity. + /// + private static void VerifyCacheAffinity(ICache cache) + { + Assert.IsInstanceOf(cache.GetConfiguration().AffinityFunction); + + var aff = cache.Ignite.GetAffinity(cache.Name); + Assert.AreEqual(PartitionCount, aff.Partitions); + + for (int i = 0; i < 100; i++) + Assert.AreEqual(i % PartitionCount, aff.GetPartition(i)); + } + + /// + /// Tests the RemoveNode method. + /// + [Test] + public void TestRemoveNode() + { + Assert.AreEqual(0, RemovedNodes.Count); + + Guid expectedNodeId; + + using (var ignite = Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration()) + { + GridName = "grid3", + })) + { + expectedNodeId = ignite.GetCluster().GetLocalNode().Id; + Assert.AreEqual(0, RemovedNodes.Count); + VerifyCacheAffinity(ignite.GetCache(CacheName)); + } + + // Called on both nodes + TestUtils.WaitForCondition(() => RemovedNodes.Count > 0, 3000); + Assert.AreEqual(expectedNodeId, RemovedNodes.Distinct().Single()); + } + + /// + /// Tests the error on non-serializable function. + /// + [Test] + public void TestNonSerializableFunction() + { + var ex = Assert.Throws(() => + _ignite.CreateCache(new CacheConfiguration("failCache") + { + AffinityFunction = new NonSerializableAffinityFunction() + })); + + Assert.AreEqual(ex.Message, "AffinityFunction should be serializable."); + } + + /// + /// Tests the exception propagation. + /// + [Test] + public void TestExceptionInFunction() + { + var cache = _ignite.CreateCache(new CacheConfiguration("failCache2") + { + AffinityFunction = new FailInGetPartitionAffinityFunction() + }); + + var ex = Assert.Throws(() => cache.Put(1, 2)); + Assert.AreEqual("User error", ex.InnerException.Message); + } + + [Serializable] + private class SimpleAffinityFunction : IAffinityFunction + { + #pragma warning disable 649 // field is never assigned + [InstanceResource] private readonly IIgnite _ignite; + + public int Partitions + { + get { return PartitionCount; } + } + + public int GetPartition(object key) + { + Assert.IsNotNull(_ignite); + + return (int) key % Partitions; + } + + public void RemoveNode(Guid nodeId) + { + RemovedNodes.Add(nodeId); + } + + public IEnumerable> AssignPartitions(AffinityFunctionContext context) + { + Assert.IsNotNull(_ignite); + + Contexts.Add(context); + + // All partitions are the same + return Enumerable.Range(0, Partitions).Select(x => context.CurrentTopologySnapshot); + } + } + + private class NonSerializableAffinityFunction : SimpleAffinityFunction + { + // No-op. + } + + [Serializable] + private class FailInGetPartitionAffinityFunction : IAffinityFunction + { + public int Partitions + { + get { return 5; } + } + + public int GetPartition(object key) + { + throw new ArithmeticException("User error"); + } + + public void RemoveNode(Guid nodeId) + { + // No-op. + } + + public IEnumerable> AssignPartitions(AffinityFunctionContext context) + { + return Enumerable.Range(0, Partitions).Select(x => context.CurrentTopologySnapshot); + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs new file mode 100644 index 0000000..e38668b --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.Cache.Affinity +{ + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cluster; + using NUnit.Framework; + + /// + /// Affinity key tests. + /// + public sealed class AffinityTest + { + /// + /// + /// + [TestFixtureSetUp] + public void StartGrids() + { + TestUtils.KillProcesses(); + + for (int i = 0; i < 3; i++) + { + var cfg = new IgniteConfiguration + { + JvmClasspath = TestUtils.CreateTestClasspath(), + JvmOptions = TestUtils.TestJavaOptions(), + SpringConfigUrl = "config\\native-client-test-cache-affinity.xml", + GridName = "grid-" + i + }; + + Ignition.Start(cfg); + } + } + + /// + /// Tear-down routine. + /// + [TestFixtureTearDown] + public void StopGrids() + { + Ignition.StopAll(true); + } + + /// + /// Test affinity key. + /// + [Test] + public void TestAffinity() + { + IIgnite g = Ignition.GetIgnite("grid-0"); + + ICacheAffinity aff = g.GetAffinity(null); + + IClusterNode node = aff.MapKeyToNode(new AffinityTestKey(0, 1)); + + for (int i = 0; i < 10; i++) + Assert.AreEqual(node.Id, aff.MapKeyToNode(new AffinityTestKey(i, 1)).Id); + } + + /// + /// Test affinity with binary flag. + /// + [Test] + public void TestAffinityBinary() + { + IIgnite g = Ignition.GetIgnite("grid-0"); + + ICacheAffinity aff = g.GetAffinity(null); + + IBinaryObject affKey = g.GetBinary().ToBinary(new AffinityTestKey(0, 1)); + + IClusterNode node = aff.MapKeyToNode(affKey); + + for (int i = 0; i < 10; i++) + { + IBinaryObject otherAffKey = + g.GetBinary().ToBinary(new AffinityTestKey(i, 1)); + + Assert.AreEqual(node.Id, aff.MapKeyToNode(otherAffKey).Id); + } + } + + /// + /// Affinity key. + /// + public class AffinityTestKey + { + /** ID. */ + private readonly int _id; + + /** Affinity key. */ + // ReSharper disable once NotAccessedField.Local + private readonly int _affKey; + + /// + /// Constructor. + /// + /// ID. + /// Affinity key. + public AffinityTestKey(int id, int affKey) + { + _id = id; + _affKey = affKey; + } + + /** */ + public override bool Equals(object obj) + { + var other = obj as AffinityTestKey; + + return other != null && _id == other._id; + } + + /** */ + public override int GetHashCode() + { + return _id; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs deleted file mode 100644 index 4fb7738..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// ReSharper disable UnusedAutoPropertyAccessor.Local -namespace Apache.Ignite.Core.Tests.Cache -{ - using System; - using System.Collections.Generic; - using System.Linq; - using Apache.Ignite.Core.Binary; - using Apache.Ignite.Core.Cache; - using Apache.Ignite.Core.Cache.Affinity; - using Apache.Ignite.Core.Cache.Configuration; - using Apache.Ignite.Core.Tests.Compute; - using NUnit.Framework; - - /// - /// Tests custom affinity mapping. - /// - public class CacheAffinityFieldTest - { - /** */ - private ICache _cache1; - - /** */ - private ICache _cache2; - - /// - /// Fixture set up. - /// - [TestFixtureSetUp] - public void FixtureSetUp() - { - var grid1 = Ignition.Start(GetConfig()); - var grid2 = Ignition.Start(GetConfig("grid2")); - - _cache1 = grid1.CreateCache(new CacheConfiguration - { - CacheMode = CacheMode.Partitioned - }); - _cache2 = grid2.GetCache(null); - } - - /// - /// Fixture tear down. - /// - [TestFixtureTearDown] - public void FixtureTearDown() - { - Ignition.StopAll(true); - } - - /// - /// Tests the metadata. - /// - [Test] - public void TestMetadata() - { - // Put keys to update meta - _cache1.Put(new CacheKey(), string.Empty); - _cache1.Put(new CacheKeyAttr(), string.Empty); - _cache1.Put(new CacheKeyAttrOverride(), string.Empty); - - // Verify - foreach (var type in new[] { typeof(CacheKey) , typeof(CacheKeyAttr), typeof(CacheKeyAttrOverride)}) - { - Assert.AreEqual("AffinityKey", _cache1.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName); - Assert.AreEqual("AffinityKey", _cache2.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName); - } - } - - /// - /// Tests that keys are located properly in cache partitions. - /// - [Test] - public void TestKeyLocation() - { - TestKeyLocation0((key, affKey) => new CacheKey {Key = key, AffinityKey = affKey}); - TestKeyLocation0((key, affKey) => new CacheKeyAttr {Key = key, AffinityKey = affKey}); - TestKeyLocation0((key, affKey) => new CacheKeyAttrOverride {Key = key, AffinityKey = affKey}); - } - - /// - /// Tests the class. - /// - [Test] - public void TestAffinityKeyClass() - { - // Check location - TestKeyLocation0((key, affKey) => new AffinityKey(key, affKey)); - - // Check meta - Assert.AreEqual("affKey", - _cache1.Ignite.GetBinary().GetBinaryType(typeof (AffinityKey)).AffinityKeyFieldName); - } - - /// - /// Tests class interoperability. - /// - [Test] - public void TestInterop() - { - var affKey = _cache1.Ignite.GetCompute() - .ExecuteJavaTask(ComputeApiTest.EchoTask, ComputeApiTest.EchoTypeAffinityKey); - - Assert.AreEqual("interopAffinityKey", affKey.Key); - } - - /// - /// Tests the key location. - /// - private void TestKeyLocation0(Func ctor) - { - var aff = _cache1.Ignite.GetAffinity(_cache1.Name); - - foreach (var cache in new[] { _cache1, _cache2 }) - { - cache.RemoveAll(); - - var localNode = cache.Ignite.GetCluster().GetLocalNode(); - - var localKeys = Enumerable.Range(1, int.MaxValue) - .Where(x => aff.MapKeyToNode(x).Id == localNode.Id).Take(100).ToArray(); - - for (int index = 0; index < localKeys.Length; index++) - { - var cacheKey = ctor(index, localKeys[index]); - - cache.Put(cacheKey, index.ToString()); - - // Verify that key is stored locally accroding to AffinityKeyFieldName - Assert.AreEqual(index.ToString(), cache.LocalPeek(cacheKey, CachePeekMode.Primary)); - - // Other cache does not have this key locally - var otherCache = cache == _cache1 ? _cache2 : _cache1; - Assert.Throws(() => otherCache.LocalPeek(cacheKey, CachePeekMode.All)); - } - } - } - - /// - /// Gets the configuration. - /// - private static IgniteConfiguration GetConfig(string gridName = null) - { - return new IgniteConfiguration(TestUtils.GetTestConfiguration()) - { - GridName = gridName, - BinaryConfiguration = new BinaryConfiguration - { - TypeConfigurations = new[] - { - new BinaryTypeConfiguration(typeof (CacheKey)) - { - AffinityKeyFieldName = "AffinityKey" - }, - new BinaryTypeConfiguration(typeof(CacheKeyAttr)), - new BinaryTypeConfiguration(typeof (CacheKeyAttrOverride)) - { - AffinityKeyFieldName = "AffinityKey" - } - } - }, - }; - } - - private class CacheKey - { - public int Key { get; set; } - public int AffinityKey { get; set; } - } - - private class CacheKeyAttr - { - public int Key { get; set; } - [AffinityKeyMapped] public int AffinityKey { get; set; } - } - - private class CacheKeyAttrOverride - { - [AffinityKeyMapped] public int Key { get; set; } - public int AffinityKey { get; set; } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs deleted file mode 100644 index 689804c..0000000 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -namespace Apache.Ignite.Core.Tests.Cache -{ - using Apache.Ignite.Core.Binary; - using Apache.Ignite.Core.Cache; - using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Impl; - using NUnit.Framework; - - /// - /// Affinity key tests. - /// - public class CacheAffinityTest - { - /// - /// - /// - [TestFixtureSetUp] - public virtual void StartGrids() - { - TestUtils.KillProcesses(); - - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.JvmClasspath = TestUtils.CreateTestClasspath(); - cfg.JvmOptions = TestUtils.TestJavaOptions(); - cfg.SpringConfigUrl = "config\\native-client-test-cache-affinity.xml"; - - for (int i = 0; i < 3; i++) - { - cfg.GridName = "grid-" + i; - - Ignition.Start(cfg); - } - } - - /// - /// Tear-down routine. - /// - [TestFixtureTearDown] - public virtual void StopGrids() - { - for (int i = 0; i < 3; i++) - Ignition.Stop("grid-" + i, true); - } - - /// - /// Test affinity key. - /// - [Test] - public void TestAffinity() - { - IIgnite g = Ignition.GetIgnite("grid-0"); - - ICacheAffinity aff = g.GetAffinity(null); - - IClusterNode node = aff.MapKeyToNode(new AffinityTestKey(0, 1)); - - for (int i = 0; i < 10; i++) - Assert.AreEqual(node.Id, aff.MapKeyToNode(new AffinityTestKey(i, 1)).Id); - } - - /// - /// Test affinity with binary flag. - /// - [Test] - public void TestAffinityBinary() - { - IIgnite g = Ignition.GetIgnite("grid-0"); - - ICacheAffinity aff = g.GetAffinity(null); - - IBinaryObject affKey = g.GetBinary().ToBinary(new AffinityTestKey(0, 1)); - - IClusterNode node = aff.MapKeyToNode(affKey); - - for (int i = 0; i < 10; i++) - { - IBinaryObject otherAffKey = - g.GetBinary().ToBinary(new AffinityTestKey(i, 1)); - - Assert.AreEqual(node.Id, aff.MapKeyToNode(otherAffKey).Id); - } - } - - /// - /// Affinity key. - /// - public class AffinityTestKey - { - /** ID. */ - private int _id; - - /** Affinity key. */ - private int _affKey; - - /// - /// Constructor. - /// - /// ID. - /// Affinity key. - public AffinityTestKey(int id, int affKey) - { - _id = id; - _affKey = affKey; - } - - /** */ - public override bool Equals(object obj) - { - AffinityTestKey other = obj as AffinityTestKey; - - return other != null && _id == other._id; - } - - /** */ - public override int GetHashCode() - { - return _id; - } - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs index eb73abe..da68ca2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs @@ -271,7 +271,7 @@ namespace Apache.Ignite.Core.Tests.Cache var py = (AffinityFunctionBase) y; Assert.AreEqual(px.GetType(), py.GetType()); - Assert.AreEqual(px.PartitionCount, py.PartitionCount); + Assert.AreEqual(px.Partitions, py.Partitions); Assert.AreEqual(px.ExcludeNeighbors, py.ExcludeNeighbors); } @@ -552,7 +552,7 @@ namespace Apache.Ignite.Core.Tests.Cache }, AffinityFunction = new RendezvousAffinityFunction { - PartitionCount = 513, + Partitions = 513, ExcludeNeighbors = true } }; @@ -645,7 +645,7 @@ namespace Apache.Ignite.Core.Tests.Cache }, AffinityFunction = new FairAffinityFunction { - PartitionCount = 113, + Partitions = 113, ExcludeNeighbors = false } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml index 6fe3e70..9c7bfb0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml @@ -34,7 +34,7 @@ + value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityTest+AffinityTestKey"/> http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs index 3056273..e435cf6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs @@ -97,7 +97,7 @@ namespace Apache.Ignite.Core.Tests - + @@ -172,7 +172,7 @@ namespace Apache.Ignite.Core.Tests var af = cacheCfg.AffinityFunction as RendezvousAffinityFunction; Assert.IsNotNull(af); - Assert.AreEqual(99, af.PartitionCount); + Assert.AreEqual(99, af.Partitions); Assert.IsTrue(af.ExcludeNeighbors); Assert.AreEqual(new Dictionary {{"myNode", "true"}}, cfg.UserAttributes); @@ -365,6 +365,14 @@ namespace Apache.Ignite.Core.Tests IdMapper = new IdMapper(), NameMapper = new NameMapper(), Serializer = new TestSerializer() + }, + new BinaryTypeConfiguration + { + IsEnum = false, + KeepDeserialized = false, + AffinityKeyFieldName = "affKeyFieldName", + TypeName = "typeName2", + Serializer = new BinaryReflectiveSerializer() } }, Types = new[] {typeof (string).FullName}, @@ -448,7 +456,7 @@ namespace Apache.Ignite.Core.Tests AffinityFunction = new FairAffinityFunction { ExcludeNeighbors = true, - PartitionCount = 48 + Partitions = 48 } } }, http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs index c6274ff..726fa3b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs @@ -22,6 +22,8 @@ namespace Apache.Ignite.Core.Tests using System.Linq; using System.Reflection; using Apache.Ignite.Core.Tests.Binary; + using Apache.Ignite.Core.Tests.Cache.Affinity; + using Apache.Ignite.Core.Tests.Cache.Query; using Apache.Ignite.Core.Tests.Memory; using NUnit.ConsoleRunner; @@ -46,9 +48,9 @@ namespace Apache.Ignite.Core.Tests return; } - TestOne(typeof(BinaryStringTest), "Test"); + //TestOne(typeof(BinaryStringTest), "Test"); - //TestAll(typeof (CacheQueriesCodeConfigurationTest)); + TestAll(typeof (AffinityFunctionTest)); //TestAllInAssembly(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 46dbd94..e7f772f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -84,6 +84,8 @@ + + http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs index ea5b21c..9b89780 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs @@ -18,10 +18,13 @@ namespace Apache.Ignite.Core.Cache.Affinity { using System; + using System.Collections.Generic; using System.ComponentModel; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Cache.Affinity.Fair; using Apache.Ignite.Core.Cache.Affinity.Rendezvous; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Common; /// /// Base class for predefined affinity functions. @@ -37,14 +40,67 @@ namespace Apache.Ignite.Core.Cache.Affinity /** */ private const byte TypeCodeRendezvous = 2; - /// The default value for property. - public const int DefaultPartitionCount = 1024; + /** */ + private const byte TypeCodeUser = 3; + + /// The default value for property. + public const int DefaultPartitions = 1024; /// /// Gets or sets the total number of partitions. /// - [DefaultValue(DefaultPartitionCount)] - public int PartitionCount { get; set; } + [DefaultValue(DefaultPartitions)] + public int Partitions { get; set; } + + /// + /// Gets partition number for a given key starting from 0. Partitioned caches + /// should make sure that keys are about evenly distributed across all partitions + /// from 0 to for best performance. + /// + /// Note that for fully replicated caches it is possible to segment key sets among different + /// grid node groups. In that case each node group should return a unique partition + /// number. However, unlike partitioned cache, mappings of keys to nodes in + /// replicated caches are constant and a node cannot migrate from one partition + /// to another. + /// + /// Key to get partition for. + /// + /// Partition number for a given key. + /// + public int GetPartition(object key) + { + throw GetDirectUsageError(); + } + + /// + /// Removes node from affinity. This method is called when it is safe to remove + /// disconnected node from affinity mapping. + /// + /// The node identifier. + public void RemoveNode(Guid nodeId) + { + throw GetDirectUsageError(); + } + + /// + /// Gets affinity nodes for a partition. In case of replicated cache, all returned + /// nodes are updated in the same manner. In case of partitioned cache, the returned + /// list should contain only the primary and back up nodes with primary node being + /// always first. + /// + /// Note that partitioned affinity must obey the following contract: given that node + /// N is primary for some key K, if any other node(s) leave + /// grid and no node joins grid, node N will remain primary for key K. + /// + /// The affinity function context. + /// + /// A collection of partitions, where each partition is a collection of nodes, + /// where first node is a primary node, and other nodes are backup nodes. + /// + public IEnumerable> AssignPartitions(AffinityFunctionContext context) + { + throw GetDirectUsageError(); + } /// /// Gets or sets a value indicating whether to exclude same-host-neighbors from being backups of each other. @@ -56,7 +112,7 @@ namespace Apache.Ignite.Core.Cache.Affinity /// internal AffinityFunctionBase() { - PartitionCount = DefaultPartitionCount; + Partitions = DefaultPartitions; } /// @@ -77,11 +133,16 @@ namespace Apache.Ignite.Core.Cache.Affinity case TypeCodeRendezvous: fun = new RendezvousAffinityFunction(); break; + case TypeCodeUser: + var f = reader.ReadObject(); + reader.ReadInt(); // skip partition count + + return f; default: throw new InvalidOperationException("Invalid AffinityFunction type code: " + typeCode); } - fun.PartitionCount = reader.ReadInt(); + fun.Partitions = reader.ReadInt(); fun.ExcludeNeighbors = reader.ReadBoolean(); return fun; @@ -100,17 +161,30 @@ namespace Apache.Ignite.Core.Cache.Affinity var p = fun as AffinityFunctionBase; - if (p == null) + if (p != null) { - throw new NotSupportedException( - string.Format("Unsupported AffinityFunction: {0}. Only predefined affinity function types " + - "are supported: {1}, {2}", fun.GetType(), typeof(FairAffinityFunction), - typeof(RendezvousAffinityFunction))); + writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous); + writer.WriteInt(p.Partitions); + writer.WriteBoolean(p.ExcludeNeighbors); } + else + { + writer.WriteByte(TypeCodeUser); + + if (!fun.GetType().IsSerializable) + throw new IgniteException("AffinityFunction should be serializable."); - writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous); - writer.WriteInt(p.PartitionCount); - writer.WriteBoolean(p.ExcludeNeighbors); + writer.WriteObject(fun); + writer.WriteInt(fun.Partitions); // partition count is written once and can not be changed. + } + } + + /// + /// Gets the direct usage error. + /// + private Exception GetDirectUsageError() + { + return new IgniteException(GetType() + " can not be used directly."); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs new file mode 100644 index 0000000..1f44d8c --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Cache.Affinity +{ + using System.Collections.Generic; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Events; + using Apache.Ignite.Core.Impl; + + /// + /// Affinity function context. + /// + public class AffinityFunctionContext + { + /** */ + private readonly List> _previousAssignment; + + /** */ + private readonly int _backups; + + /** */ + private readonly ICollection _currentTopologySnapshot; + + /** */ + private readonly AffinityTopologyVersion _currentTopologyVersion; + + /** */ + private readonly DiscoveryEvent _discoveryEvent; + + /// + /// Initializes a new instance of the class. + /// + /// The reader. + internal AffinityFunctionContext(IBinaryRawReader reader) + { + var cnt = reader.ReadInt(); + + if (cnt > 0) + { + _previousAssignment = new List>(cnt); + + for (var i = 0; i < cnt; i++) + _previousAssignment.Add(IgniteUtils.ReadNodes(reader)); + } + + _backups = reader.ReadInt(); + _currentTopologySnapshot = IgniteUtils.ReadNodes(reader); + _currentTopologyVersion = new AffinityTopologyVersion(reader.ReadLong(), reader.ReadInt()); + _discoveryEvent = EventReader.Read(reader); + } + + /// + /// Gets the affinity assignment for given partition on previous topology version. + /// First node in returned list is a primary node, other nodes are backups. + /// + /// The partition to get previous assignment for. + /// + /// List of nodes assigned to a given partition on previous topology version or null + /// if this information is not available. + /// + public ICollection GetPreviousAssignment(int partition) + { + return _previousAssignment == null ? null : _previousAssignment[partition]; + } + + /// + /// Gets number of backups for new assignment. + /// + public int Backups + { + get { return _backups; } + } + + /// + /// Gets the current topology snapshot. Snapshot will contain only nodes on which the particular + /// cache is configured. List of passed nodes is guaranteed to be sorted in a same order + /// on all nodes on which partition assignment is performed. + /// + public ICollection CurrentTopologySnapshot + { + get { return _currentTopologySnapshot; } + } + + /// + /// Gets the current topology version. + /// + public AffinityTopologyVersion CurrentTopologyVersion + { + get { return _currentTopologyVersion; } + } + + /// + /// Gets the discovery event that caused the topology change. + /// + public DiscoveryEvent DiscoveryEvent + { + get { return _discoveryEvent; } + } + } +} \ No newline at end of file