ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [30/50] [abbrv] ignite git commit: IGNITE-3328 .NET: Support user-defined AffinityFunction
Date Tue, 28 Jun 2016 13:07:17 GMT
IGNITE-3328 .NET: Support user-defined AffinityFunction

This closes #826


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1c755c7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1c755c7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1c755c7

Branch: refs/heads/ignite-1232
Commit: e1c755c7ec1e8e7d6fc60b722c32d25bf188cd6b
Parents: 6cfd991
Author: Pavel Tupitsyn <ptupitsyn@apache.org>
Authored: Fri Jun 24 18:57:07 2016 +0300
Committer: Pavel Tupitsyn <ptupitsyn@apache.org>
Committed: Fri Jun 24 18:57:07 2016 +0300

----------------------------------------------------------------------
 .../GridAffinityFunctionContextImpl.java        |   9 +
 .../affinity/PlatformAffinityFunction.java      | 242 ++++++++++++++++
 .../callback/PlatformCallbackGateway.java       |  87 ++++++
 .../callback/PlatformCallbackUtils.java         |  46 +++
 .../utils/PlatformConfigurationUtils.java       |  13 +-
 .../platforms/cpp/jni/include/ignite/jni/java.h |  19 +-
 modules/platforms/cpp/jni/src/java.cpp          |  35 ++-
 .../Apache.Ignite.Core.Tests.csproj             |   5 +-
 .../Cache/Affinity/AffinityFieldTest.cs         | 199 +++++++++++++
 .../Cache/Affinity/AffinityFunctionTest.cs      | 282 +++++++++++++++++++
 .../Cache/Affinity/AffinityTest.cs              | 138 +++++++++
 .../Cache/CacheAffinityFieldTest.cs             | 199 -------------
 .../Cache/CacheAffinityTest.cs                  | 139 ---------
 .../Cache/CacheConfigurationTest.cs             |   6 +-
 .../native-client-test-cache-affinity.xml       |   2 +-
 .../IgniteConfigurationSerializerTest.cs        |  14 +-
 .../Apache.Ignite.Core.Tests/TestRunner.cs      |   6 +-
 .../Apache.Ignite.Core.csproj                   |   2 +
 .../Cache/Affinity/AffinityFunctionBase.cs      | 102 ++++++-
 .../Cache/Affinity/AffinityFunctionContext.cs   | 116 ++++++++
 .../Cache/Affinity/AffinityTopologyVersion.cs   | 138 +++++++++
 .../Cache/Affinity/IAffinityFunction.cs         |  55 +++-
 .../Cache/Configuration/CacheConfiguration.cs   |   9 +-
 .../Apache.Ignite.Core/Events/EventBase.cs      |   2 +-
 .../Apache.Ignite.Core/Events/EventReader.cs    |   8 +-
 .../Apache.Ignite.Core/IgniteConfiguration.cs   |   1 +
 .../IgniteConfigurationSection.xsd              |   4 +-
 .../dotnet/Apache.Ignite.Core/Ignition.cs       |   4 +-
 .../Impl/Unmanaged/UnmanagedCallbackHandlers.cs |   6 +
 .../Impl/Unmanaged/UnmanagedCallbacks.cs        | 120 +++++++-
 .../dotnet/Apache.Ignite.sln.DotSettings        |   7 +-
 31 files changed, 1629 insertions(+), 386 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
index 6c97efd..e2bb99d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java
@@ -80,4 +80,13 @@ public class GridAffinityFunctionContextImpl implements AffinityFunctionContext
     @Override public int backups() {
         return backups;
     }
+
+    /**
+     * Gets the previous assignment.
+     *
+     * @return Previous assignment
+     */
+    public List<List<ClusterNode>> prevAssignment() {
+        return prevAssignment;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
new file mode 100644
index 0000000..4da5e24
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache.affinity;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Platform AffinityFunction.
+ */
+public class PlatformAffinityFunction implements AffinityFunction, Externalizable, LifecycleAware {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private Object userFunc;
+
+    /** */
+    private int partitions;
+
+    /** */
+    private transient Ignite ignite;
+
+    /** */
+    private transient PlatformContext ctx;
+
+    /** */
+    private transient long ptr;
+
+    /**
+     * Ctor for serialization.
+     *
+     */
+    public PlatformAffinityFunction() {
+        partitions = -1;
+    }
+
+    /**
+     * Ctor.
+     *
+     * @param func User fun object.
+     * @param partitions Initial number of partitions.
+     */
+    public PlatformAffinityFunction(Object func, int partitions) {
+        userFunc = func;
+        this.partitions = partitions;
+    }
+
+    /** {@inheritDoc} */
+    public Object getUserFunc() {
+        return userFunc;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void reset() {
+        // No-op: userFunc is always in initial state (it is serialized only once on start).
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partitions() {
+        // Affinity function can not return different number of partitions,
+        // so we pass this value once from the platform.
+        assert partitions > 0;
+
+        return partitions;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int partition(Object key) {
+        assert ctx != null;
+        assert ptr != 0;
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+            BinaryRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(key);
+
+            out.synchronize();
+
+            return ctx.gateway().affinityFunctionPartition(ptr, mem.pointer());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) {
+        assert ctx != null;
+        assert ptr != 0;
+        assert affCtx != null;
+
+        try (PlatformMemory outMem = ctx.memory().allocate()) {
+            try (PlatformMemory inMem = ctx.memory().allocate()) {
+                PlatformOutputStream out = outMem.output();
+                BinaryRawWriterEx writer = ctx.writer(out);
+
+                // Write previous assignment
+                List<List<ClusterNode>> prevAssignment = ((GridAffinityFunctionContextImpl)affCtx).prevAssignment();
+
+                if (prevAssignment == null)
+                    writer.writeInt(-1);
+                else {
+                    writer.writeInt(prevAssignment.size());
+
+                    for (List<ClusterNode> part : prevAssignment)
+                        ctx.writeNodes(writer, part);
+                }
+
+                // Write other props
+                writer.writeInt(affCtx.backups());
+                ctx.writeNodes(writer, affCtx.currentTopologySnapshot());
+                writer.writeLong(affCtx.currentTopologyVersion().topologyVersion());
+                writer.writeInt(affCtx.currentTopologyVersion().minorTopologyVersion());
+                ctx.writeEvent(writer, affCtx.discoveryEvent());
+
+                // Call platform
+                out.synchronize();
+                ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer());
+
+                PlatformInputStream in = inMem.input();
+                BinaryRawReaderEx reader = ctx.reader(in);
+
+                // Read result
+                int partCnt = in.readInt();
+                List<List<ClusterNode>> res = new ArrayList<>(partCnt);
+                IgniteClusterEx cluster = ctx.kernalContext().grid().cluster();
+
+                for (int i = 0; i < partCnt; i++) {
+                    int partSize = in.readInt();
+                    List<ClusterNode> part = new ArrayList<>(partSize);
+
+                    for (int j = 0; j < partSize; j++)
+                        part.add(cluster.node(reader.readUuid()));
+
+                    res.add(part);
+                }
+
+                return res;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeNode(UUID nodeId) {
+        assert ctx != null;
+        assert ptr != 0;
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+            BinaryRawWriterEx writer = ctx.writer(out);
+
+            writer.writeUuid(nodeId);
+
+            out.synchronize();
+
+            ctx.gateway().affinityFunctionRemoveNode(ptr, mem.pointer());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(userFunc);
+        out.writeInt(partitions);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        userFunc = in.readObject();
+        partitions = in.readInt();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteException {
+        assert ignite != null;
+        ctx = PlatformUtils.platformContext(ignite);
+        assert ctx != null;
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+            BinaryRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(userFunc);
+
+            out.synchronize();
+
+            ptr = ctx.gateway().affinityFunctionInit(mem.pointer());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() throws IgniteException {
+        assert ctx != null;
+
+        ctx.gateway().affinityFunctionDestroy(ptr);
+    }
+
+    /**
+     * Injects the Ignite.
+     *
+     * @param ignite Ignite.
+     */
+    @IgniteInstanceResource
+    private void setIgnite(Ignite ignite) {
+        this.ignite = ignite;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
index 3439f38..3708e8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -951,6 +951,93 @@ public class PlatformCallbackGateway {
     }
 
     /**
+     * Initializes affinity function.
+     *
+     * @param memPtr Pointer to a stream with serialized affinity function.
+     * @return Affinity function pointer.
+     */
+    public long affinityFunctionInit(long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Gets the partition from affinity function.
+     *
+     * @param ptr Affinity function pointer.
+     * @param memPtr Pointer to a stream with key object.
+     * @return Partition number for a given key.
+     */
+    public int affinityFunctionPartition(long ptr, long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.affinityFunctionPartition(envPtr, ptr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Assigns the affinity partitions.
+     *
+     * @param ptr Affinity function pointer.
+     * @param outMemPtr Pointer to a stream with affinity context.
+     * @param inMemPtr Pointer to a stream with result.
+     */
+    public void affinityFunctionAssignPartitions(long ptr, long outMemPtr, long inMemPtr){
+        enter();
+
+        try {
+            PlatformCallbackUtils.affinityFunctionAssignPartitions(envPtr, ptr, outMemPtr, inMemPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Removes the node from affinity function.
+     *
+     * @param ptr Affinity function pointer.
+     * @param memPtr Pointer to a stream with node id.
+     */
+    public void affinityFunctionRemoveNode(long ptr, long memPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.affinityFunctionRemoveNode(envPtr, ptr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Destroys the affinity function.
+     *
+     * @param ptr Affinity function pointer.
+     */
+    public void affinityFunctionDestroy(long ptr) {
+        if (!lock.enterBusy())
+            return;  // skip: destroy is not necessary during shutdown.
+
+        try {
+            PlatformCallbackUtils.affinityFunctionDestroy(envPtr, ptr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
      * Enter gateway.
      */
     protected void enter() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
index f7d6586..d19782d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -496,6 +496,52 @@ public class PlatformCallbackUtils {
     static native void onClientReconnected(long envPtr, boolean clusterRestarted);
 
     /**
+     * Initializes affinity function.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Pointer to a stream with serialized affinity function.
+     * @return Affinity function pointer.
+     */
+    static native long affinityFunctionInit(long envPtr, long memPtr);
+
+    /**
+     * Gets the partition from affinity function.
+     *
+     * @param envPtr Environment pointer.
+     * @param ptr Affinity function pointer.
+     * @param memPtr Pointer to a stream with key object.
+     * @return Partition number for a given key.
+     */
+    static native int affinityFunctionPartition(long envPtr, long ptr, long memPtr);
+
+    /**
+     * Assigns the affinity partitions.
+     *
+     * @param envPtr Environment pointer.
+     * @param ptr Affinity function pointer.
+     * @param outMemPtr Pointer to a stream with affinity context.
+     * @param inMemPtr Pointer to a stream with result.
+     */
+    static native void affinityFunctionAssignPartitions(long envPtr, long ptr, long outMemPtr, long inMemPtr);
+
+    /**
+     * Removes the node from affinity function.
+     *
+     * @param envPtr Environment pointer.
+     * @param ptr Affinity function pointer.
+     * @param memPtr Pointer to a stream with node id.
+     */
+    static native void affinityFunctionRemoveNode(long envPtr, long ptr, long memPtr);
+
+    /**
+     * Destroys the affinity function.
+     *
+     * @param envPtr Environment pointer.
+     * @param ptr Affinity function pointer.
+     */
+    static native void affinityFunctionDestroy(long envPtr, long ptr);
+
+    /**
      * Private constructor.
      */
     private PlatformCallbackUtils() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index 29b6a70..7353f08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -41,6 +41,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.binary.*;
+import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction;
 import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryConfiguration;
 import org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration;
 import org.apache.ignite.platform.dotnet.PlatformDotNetCacheStoreFactoryNative;
@@ -237,7 +238,7 @@ public class PlatformConfigurationUtils {
      * @param in Stream.
      * @return Affinity function.
      */
-    private static AffinityFunction readAffinityFunction(BinaryRawReader in) {
+    private static AffinityFunction readAffinityFunction(BinaryRawReaderEx in) {
         byte plcTyp = in.readByte();
 
         switch (plcTyp) {
@@ -255,6 +256,9 @@ public class PlatformConfigurationUtils {
                 f.setExcludeNeighbors(in.readBoolean());
                 return f;
             }
+            case 3: {
+                return new PlatformAffinityFunction(in.readObjectDetached(), in.readInt());
+            }
             default:
                 assert false;
         }
@@ -296,6 +300,13 @@ public class PlatformConfigurationUtils {
             out.writeInt(f0.getPartitions());
             out.writeBoolean(f0.isExcludeNeighbors());
         }
+        else if (f instanceof PlatformAffinityFunction) {
+            out.writeByte((byte)3);
+
+            PlatformAffinityFunction f0 = (PlatformAffinityFunction)f;
+            out.writeObject(f0.getUserFunc());
+            out.writeInt(f.partitions());
+        }
         else {
             out.writeByte((byte)0);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index 3d45ec0..98779c4 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -21,7 +21,6 @@
 #include <jni.h>
 
 #include "ignite/common/common.h"
-#include "ignite/ignite_error.h"
 
 namespace ignite
 {
@@ -101,6 +100,12 @@ namespace ignite
             typedef void(JNICALL *OnClientDisconnectedHandler)(void* target);
             typedef void(JNICALL *OnClientReconnectedHandler)(void* target, unsigned char clusterRestarted);
 
+            typedef long long(JNICALL *AffinityFunctionInitHandler)(void* target, long long memPtr);
+            typedef int(JNICALL *AffinityFunctionPartitionHandler)(void* target, long long ptr, long long memPtr);
+            typedef void(JNICALL *AffinityFunctionAssignPartitionsHandler)(void* target, long long ptr, long long inMemPtr, long long outMemPtr);
+            typedef void(JNICALL *AffinityFunctionRemoveNodeHandler)(void* target, long long ptr, long long memPtr);
+            typedef void(JNICALL *AffinityFunctionDestroyHandler)(void* target, long long ptr);
+
             /**
              * JNI handlers holder.
              */
@@ -178,6 +183,12 @@ namespace ignite
 
                 OnClientDisconnectedHandler onClientDisconnected;
                 OnClientReconnectedHandler onClientReconnected;
+                
+                AffinityFunctionInitHandler affinityFunctionInit;
+                AffinityFunctionPartitionHandler affinityFunctionPartition;
+                AffinityFunctionAssignPartitionsHandler affinityFunctionAssignPartitions;
+                AffinityFunctionRemoveNodeHandler affinityFunctionRemoveNode;
+                AffinityFunctionDestroyHandler affinityFunctionDestroy;
             };
 
             /**
@@ -740,6 +751,12 @@ namespace ignite
 
             JNIEXPORT void JNICALL JniOnClientDisconnected(JNIEnv *env, jclass cls, jlong envPtr);
             JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted);
+
+            JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr);
+            JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr);
+            JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr);
+            JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr);
+            JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 66be0ca..577ee26 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -22,6 +22,7 @@
 #include "ignite/jni/utils.h"
 #include "ignite/common/concurrent.h"
 #include "ignite/jni/java.h"
+#include <ignite/ignite_error.h>
 
 #define IGNITE_SAFE_PROC_NO_ARG(jniEnv, envPtr, type, field) { \
     JniHandlers* hnds = reinterpret_cast<JniHandlers*>(envPtr); \
@@ -362,6 +363,12 @@ namespace ignite
             JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED = JniMethod("onClientDisconnected", "(J)V", true);
             JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED = JniMethod("onClientReconnected", "(JZ)V", true);
 
+            JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJ)J", true);
+            JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true);
+            JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true);
+            JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true);
+            JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY = JniMethod("affinityFunctionDestroy", "(JJ)V", true);
+
             const char* C_PLATFORM_UTILS = "org/apache/ignite/internal/processors/platform/utils/PlatformUtils";
             JniMethod M_PLATFORM_UTILS_REALLOC = JniMethod("reallocate", "(JI)V", true);
             JniMethod M_PLATFORM_UTILS_ERR_DATA = JniMethod("errorData", "(Ljava/lang/Throwable;)[B", true);
@@ -821,7 +828,7 @@ namespace ignite
 
             void RegisterNatives(JNIEnv* env) {
                 {
-					JNINativeMethod methods[54];
+					JNINativeMethod methods[59];
 
                     int idx = 0;
 
@@ -898,6 +905,12 @@ namespace ignite
                     AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED, reinterpret_cast<void*>(JniOnClientDisconnected));
                     AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED, reinterpret_cast<void*>(JniOnClientReconnected));
 
+                    AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT, reinterpret_cast<void*>(JniAffinityFunctionInit));
+                    AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION, reinterpret_cast<void*>(JniAffinityFunctionPartition));
+                    AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS, reinterpret_cast<void*>(JniAffinityFunctionAssignPartitions));
+                    AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE, reinterpret_cast<void*>(JniAffinityFunctionRemoveNode));
+                    AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY, reinterpret_cast<void*>(JniAffinityFunctionDestroy));
+
                     jint res = env->RegisterNatives(FindClass(env, C_PLATFORM_CALLBACK_UTILS), methods, idx);
 
                     if (res != JNI_OK)
@@ -2833,6 +2846,26 @@ namespace ignite
             JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted) {
                 IGNITE_SAFE_PROC(env, envPtr, OnClientReconnectedHandler, onClientReconnected, clusterRestarted);
             }
+            
+            JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr) {
+                IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionInitHandler, affinityFunctionInit, memPtr);
+            }
+
+            JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) {
+                IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionPartitionHandler, affinityFunctionPartition, ptr, memPtr);
+            }
+            
+            JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr) {
+                IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionAssignPartitionsHandler, affinityFunctionAssignPartitions, ptr, inMemPtr, outMemPtr);
+            }
+
+            JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) {
+                IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionRemoveNodeHandler, affinityFunctionRemoveNode, ptr, memPtr);
+            }
+
+            JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr) {
+                IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionDestroyHandler, affinityFunctionDestroy, ptr);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
index 1a367d4..15e46ae 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj
@@ -59,13 +59,14 @@
     <Compile Include="Binary\BinaryCompactFooterInteropTest.cs" />
     <Compile Include="Binary\BinarySelfTestFullFooter.cs" />
     <Compile Include="Binary\BinaryStringTest.cs" />
-    <Compile Include="Cache\CacheAffinityFieldTest.cs" />
+    <Compile Include="Cache\Affinity\AffinityFieldTest.cs" />
+    <Compile Include="Cache\Affinity\AffinityFunctionTest.cs" />
     <Compile Include="Cache\CacheConfigurationTest.cs" />
     <Compile Include="Cache\CacheDynamicStartTest.cs" />
     <Compile Include="Cache\CacheNearTest.cs" />
     <Compile Include="Cache\CacheTestAsyncWrapper.cs" />
     <Compile Include="Cache\CacheAbstractTest.cs" />
-    <Compile Include="Cache\CacheAffinityTest.cs" />
+    <Compile Include="Cache\Affinity\AffinityTest.cs" />
     <Compile Include="Cache\CacheEntryTest.cs" />
     <Compile Include="Cache\CacheForkedTest.cs" />
     <Compile Include="Cache\CacheLocalAtomicTest.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs
new file mode 100644
index 0000000..ceb04cd
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFieldTest.cs
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ReSharper disable UnusedAutoPropertyAccessor.Local
+namespace Apache.Ignite.Core.Tests.Cache.Affinity
+{
+    using System;
+    using System.Collections.Generic;
+    using System.Linq;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Cache.Affinity;
+    using Apache.Ignite.Core.Cache.Configuration;
+    using Apache.Ignite.Core.Tests.Compute;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests custom affinity mapping.
+    /// </summary>
+    public class AffinityFieldTest
+    {
+        /** */
+        private ICache<object, string> _cache1;
+
+        /** */
+        private ICache<object, string> _cache2;
+
+        /// <summary>
+        /// Fixture set up.
+        /// </summary>
+        [TestFixtureSetUp]
+        public void FixtureSetUp()
+        {
+            var grid1 = Ignition.Start(GetConfig());
+            var grid2 = Ignition.Start(GetConfig("grid2"));
+
+            _cache1 = grid1.CreateCache<object, string>(new CacheConfiguration
+            {
+                CacheMode = CacheMode.Partitioned
+            });
+            _cache2 = grid2.GetCache<object, string>(null);
+        }
+
+        /// <summary>
+        /// Fixture tear down.
+        /// </summary>
+        [TestFixtureTearDown]
+        public void FixtureTearDown()
+        {
+            Ignition.StopAll(true);
+        }
+
+        /// <summary>
+        /// Tests the metadata.
+        /// </summary>
+        [Test]
+        public void TestMetadata()
+        {
+            // Put keys to update meta
+            _cache1.Put(new CacheKey(), string.Empty);
+            _cache1.Put(new CacheKeyAttr(), string.Empty);
+            _cache1.Put(new CacheKeyAttrOverride(), string.Empty);
+
+            // Verify
+            foreach (var type in new[] { typeof(CacheKey) , typeof(CacheKeyAttr), typeof(CacheKeyAttrOverride)})
+            {
+                Assert.AreEqual("AffinityKey", _cache1.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName);
+                Assert.AreEqual("AffinityKey", _cache2.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName);
+            }
+        }
+
+        /// <summary>
+        /// Tests that keys are located properly in cache partitions.
+        /// </summary>
+        [Test]
+        public void TestKeyLocation()
+        {
+            TestKeyLocation0((key, affKey) => new CacheKey {Key = key, AffinityKey = affKey});
+            TestKeyLocation0((key, affKey) => new CacheKeyAttr {Key = key, AffinityKey = affKey});
+            TestKeyLocation0((key, affKey) => new CacheKeyAttrOverride {Key = key, AffinityKey = affKey});
+        }
+
+        /// <summary>
+        /// Tests the <see cref="AffinityKey"/> class.
+        /// </summary>
+        [Test]
+        public void TestAffinityKeyClass()
+        {
+            // Check location
+            TestKeyLocation0((key, affKey) => new AffinityKey(key, affKey));
+
+            // Check meta
+            Assert.AreEqual("affKey",
+                _cache1.Ignite.GetBinary().GetBinaryType(typeof (AffinityKey)).AffinityKeyFieldName);
+        }
+
+        /// <summary>
+        /// Tests <see cref="AffinityKey"/> class interoperability.
+        /// </summary>
+        [Test]
+        public void TestInterop()
+        {
+            var affKey = _cache1.Ignite.GetCompute()
+                .ExecuteJavaTask<AffinityKey>(ComputeApiTest.EchoTask, ComputeApiTest.EchoTypeAffinityKey);
+
+            Assert.AreEqual("interopAffinityKey", affKey.Key);
+        }
+
+        /// <summary>
+        /// Tests the key location.
+        /// </summary>
+        private void TestKeyLocation0<T>(Func<int, int, T> ctor)
+        {
+            var aff = _cache1.Ignite.GetAffinity(_cache1.Name);
+
+            foreach (var cache in new[] { _cache1, _cache2 })
+            {
+                cache.RemoveAll();
+
+                var localNode = cache.Ignite.GetCluster().GetLocalNode();
+
+                var localKeys = Enumerable.Range(1, int.MaxValue)
+                    .Where(x => aff.MapKeyToNode(x).Id == localNode.Id).Take(100).ToArray();
+
+                for (int index = 0; index < localKeys.Length; index++)
+                {
+                    var cacheKey = ctor(index, localKeys[index]);
+
+                    cache.Put(cacheKey, index.ToString());
+
+                    // Verify that key is stored locally according to AffinityKeyFieldName
+                    Assert.AreEqual(index.ToString(), cache.LocalPeek(cacheKey, CachePeekMode.Primary));
+
+                    // Other cache does not have this key locally
+                    var otherCache = cache == _cache1 ? _cache2 : _cache1;
+                    Assert.Throws<KeyNotFoundException>(() => otherCache.LocalPeek(cacheKey, CachePeekMode.All));
+                }
+            }
+        }
+
+        /// <summary>
+        /// Gets the configuration.
+        /// </summary>
+        private static IgniteConfiguration GetConfig(string gridName = null)
+        {
+            return new IgniteConfiguration(TestUtils.GetTestConfiguration())
+            {
+                GridName = gridName,
+                BinaryConfiguration = new BinaryConfiguration
+                {
+                    TypeConfigurations = new[]
+                    {
+                        new BinaryTypeConfiguration(typeof (CacheKey))
+                        {
+                            AffinityKeyFieldName = "AffinityKey"
+                        },
+                        new BinaryTypeConfiguration(typeof(CacheKeyAttr)),
+                        new BinaryTypeConfiguration(typeof (CacheKeyAttrOverride))
+                        {
+                            AffinityKeyFieldName = "AffinityKey"
+                        }
+                    }
+                },
+            };
+        }
+
+        private class CacheKey
+        {
+            public int Key { get; set; }
+            public int AffinityKey { get; set; }
+        }
+
+        private class CacheKeyAttr
+        {
+            public int Key { get; set; }
+            [AffinityKeyMapped] public int AffinityKey { get; set; }
+        }
+
+        private class CacheKeyAttrOverride
+        {
+            [AffinityKeyMapped] public int Key { get; set; }
+            public int AffinityKey { get; set; }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs
new file mode 100644
index 0000000..70e0d78
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionTest.cs
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Affinity
+{
+    using System;
+    using System.Collections.Concurrent;
+    using System.Collections.Generic;
+    using System.Linq;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Cache.Affinity;
+    using Apache.Ignite.Core.Cache.Configuration;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Resource;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests user-defined <see cref="IAffinityFunction"/>
+    /// </summary>
+    public class AffinityFunctionTest
+    {
+        /** */
+        private IIgnite _ignite;
+
+        /** */
+        private IIgnite _ignite2;
+
+        /** */
+        private const string CacheName = "cache";
+
+        /** */
+        private const int PartitionCount = 10;
+
+        /** */
+        private static readonly ConcurrentBag<Guid> RemovedNodes = new ConcurrentBag<Guid>();
+
+        /** */
+        private static readonly ConcurrentBag<AffinityFunctionContext> Contexts =
+            new ConcurrentBag<AffinityFunctionContext>();
+
+        /// <summary>
+        /// Fixture set up.
+        /// </summary>
+        [TestFixtureSetUp]
+        public void FixtureSetUp()
+        {
+            var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
+            {
+                CacheConfiguration = new[]
+                {
+                    new CacheConfiguration(CacheName)
+                    {
+                        AffinityFunction = new SimpleAffinityFunction(),
+                        Backups = 7
+                    }
+                }
+            };
+
+            _ignite = Ignition.Start(cfg);
+
+            _ignite2 = Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration()) {GridName = "grid2"});
+        }
+
+        /// <summary>
+        /// Fixture tear down.
+        /// </summary>
+        [TestFixtureTearDown]
+        public void FixtureTearDown()
+        {
+            // Check that affinity handles are present
+            TestUtils.AssertHandleRegistryHasItems(_ignite, _ignite.GetCacheNames().Count, 0);
+            TestUtils.AssertHandleRegistryHasItems(_ignite2, _ignite.GetCacheNames().Count, 0);
+
+            // Destroy all caches
+            _ignite.GetCacheNames().ToList().ForEach(_ignite.DestroyCache);
+            Assert.AreEqual(0, _ignite.GetCacheNames().Count);
+
+            // Check that all affinity functions got released
+            TestUtils.AssertHandleRegistryIsEmpty(1000, _ignite, _ignite2);
+
+            Ignition.StopAll(true);
+        }
+
+        /// <summary>
+        /// Tests the static cache.
+        /// </summary>
+        [Test]
+        public void TestStaticCache()
+        {
+            VerifyCacheAffinity(_ignite.GetCache<int, int>(CacheName));
+            VerifyCacheAffinity(_ignite2.GetCache<int, int>(CacheName));
+        }
+
+        /// <summary>
+        /// Tests the dynamic cache.
+        /// </summary>
+        [Test]
+        public void TestDynamicCache()
+        {
+            const string cacheName = "dynCache";
+
+            VerifyCacheAffinity(_ignite.CreateCache<int, int>(new CacheConfiguration(cacheName)
+            {
+                AffinityFunction = new SimpleAffinityFunction(),
+                Backups = 5
+            }));
+
+            VerifyCacheAffinity(_ignite2.GetCache<int, int>(cacheName));
+            
+            // Verify context for new cache
+            var lastCtx = Contexts.Where(x => x.GetPreviousAssignment(1) == null)
+                .OrderBy(x => x.DiscoveryEvent.Timestamp).Last();
+
+            Assert.AreEqual(new AffinityTopologyVersion(2, 1), lastCtx.CurrentTopologyVersion);
+            Assert.AreEqual(5, lastCtx.Backups);
+
+            // Verify context for old cache
+            var ctx = Contexts.Where(x => x.GetPreviousAssignment(1) != null)
+                .OrderBy(x => x.DiscoveryEvent.Timestamp).Last();
+
+            Assert.AreEqual(new AffinityTopologyVersion(2, 0), ctx.CurrentTopologyVersion);
+            Assert.AreEqual(7, ctx.Backups);
+            CollectionAssert.AreEquivalent(_ignite.GetCluster().GetNodes(), ctx.CurrentTopologySnapshot);
+
+            var evt = ctx.DiscoveryEvent;
+            CollectionAssert.AreEquivalent(_ignite.GetCluster().GetNodes(), evt.TopologyNodes);
+            CollectionAssert.Contains(_ignite.GetCluster().GetNodes(), evt.EventNode);
+            Assert.AreEqual(_ignite.GetCluster().TopologyVersion, evt.TopologyVersion);
+
+            var firstTop = _ignite.GetCluster().GetTopology(1);
+            var parts = Enumerable.Range(0, PartitionCount).ToArray();
+            CollectionAssert.AreEqual(parts.Select(x => firstTop), parts.Select(x => ctx.GetPreviousAssignment(x)));
+        }
+
+        /// <summary>
+        /// Verifies the cache affinity.
+        /// </summary>
+        private static void VerifyCacheAffinity(ICache<int, int> cache)
+        {
+            Assert.IsInstanceOf<SimpleAffinityFunction>(cache.GetConfiguration().AffinityFunction);
+
+            var aff = cache.Ignite.GetAffinity(cache.Name);
+            Assert.AreEqual(PartitionCount, aff.Partitions);
+
+            for (int i = 0; i < 100; i++)
+                Assert.AreEqual(i % PartitionCount, aff.GetPartition(i));
+        }
+
+        /// <summary>
+        /// Tests the RemoveNode method.
+        /// </summary>
+        [Test]
+        public void TestRemoveNode()
+        {
+            Assert.AreEqual(0, RemovedNodes.Count);
+
+            Guid expectedNodeId;
+
+            using (var ignite = Ignition.Start(new IgniteConfiguration(TestUtils.GetTestConfiguration())
+            {
+                GridName = "grid3",
+            }))
+            {
+                expectedNodeId = ignite.GetCluster().GetLocalNode().Id;
+                Assert.AreEqual(0, RemovedNodes.Count);
+                VerifyCacheAffinity(ignite.GetCache<int, int>(CacheName));
+            }
+
+            // Called on both nodes
+            TestUtils.WaitForCondition(() => RemovedNodes.Count > 0, 3000);
+            Assert.AreEqual(expectedNodeId, RemovedNodes.Distinct().Single());
+        }
+
+        /// <summary>
+        /// Tests the error on non-serializable function.
+        /// </summary>
+        [Test]
+        public void TestNonSerializableFunction()
+        {
+            var ex = Assert.Throws<IgniteException>(() =>
+                _ignite.CreateCache<int, int>(new CacheConfiguration("failCache")
+                {
+                    AffinityFunction = new NonSerializableAffinityFunction()
+                }));
+
+            Assert.AreEqual(ex.Message, "AffinityFunction should be serializable.");
+        }
+
+        /// <summary>
+        /// Tests the exception propagation.
+        /// </summary>
+        [Test]
+        public void TestExceptionInFunction()
+        {
+            var cache = _ignite.CreateCache<int, int>(new CacheConfiguration("failCache2")
+            {
+                AffinityFunction = new FailInGetPartitionAffinityFunction()
+            });
+
+            var ex = Assert.Throws<CacheException>(() => cache.Put(1, 2));
+            Assert.AreEqual("User error", ex.InnerException.Message);
+        }
+
+        [Serializable]
+        private class SimpleAffinityFunction : IAffinityFunction
+        {
+            #pragma warning disable 649  // field is never assigned
+            [InstanceResource] private readonly IIgnite _ignite;
+
+            public int Partitions
+            {
+                get { return PartitionCount; }
+            }
+
+            public int GetPartition(object key)
+            {
+                Assert.IsNotNull(_ignite);
+
+                return (int) key % Partitions;
+            }
+
+            public void RemoveNode(Guid nodeId)
+            {
+                RemovedNodes.Add(nodeId);
+            }
+
+            public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+            {
+                Assert.IsNotNull(_ignite);
+
+                Contexts.Add(context);
+
+                // All partitions are the same
+                return Enumerable.Range(0, Partitions).Select(x => context.CurrentTopologySnapshot);
+            }
+        }
+
+        private class NonSerializableAffinityFunction : SimpleAffinityFunction
+        {
+            // No-op.
+        }
+
+        [Serializable]
+        private class FailInGetPartitionAffinityFunction : IAffinityFunction
+        {
+            public int Partitions
+            {
+                get { return 5; }
+            }
+
+            public int GetPartition(object key)
+            {
+                throw new ArithmeticException("User error");
+            }
+
+            public void RemoveNode(Guid nodeId)
+            {
+                // No-op.
+            }
+
+            public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+            {
+                return Enumerable.Range(0, Partitions).Select(x => context.CurrentTopologySnapshot);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs
new file mode 100644
index 0000000..e38668b
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityTest.cs
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Tests.Cache.Affinity
+{
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cache;
+    using Apache.Ignite.Core.Cluster;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Affinity key tests.
+    /// </summary>
+    public sealed class AffinityTest
+    {
+        /// <summary>
+        ///
+        /// </summary>
+        [TestFixtureSetUp]
+        public void StartGrids()
+        {
+            TestUtils.KillProcesses();
+
+            for (int i = 0; i < 3; i++)
+            {
+                var cfg = new IgniteConfiguration
+                {
+                    JvmClasspath = TestUtils.CreateTestClasspath(),
+                    JvmOptions = TestUtils.TestJavaOptions(),
+                    SpringConfigUrl = "config\\native-client-test-cache-affinity.xml",
+                    GridName = "grid-" + i
+                };
+
+                Ignition.Start(cfg);
+            }
+        }
+
+        /// <summary>
+        /// Tear-down routine.
+        /// </summary>
+        [TestFixtureTearDown]
+        public void StopGrids()
+        {
+            Ignition.StopAll(true);
+        }
+
+        /// <summary>
+        /// Test affinity key.
+        /// </summary>
+        [Test]
+        public void TestAffinity()
+        {
+            IIgnite g = Ignition.GetIgnite("grid-0");
+
+            ICacheAffinity aff = g.GetAffinity(null);
+
+            IClusterNode node = aff.MapKeyToNode(new AffinityTestKey(0, 1));
+
+            for (int i = 0; i < 10; i++)
+                Assert.AreEqual(node.Id, aff.MapKeyToNode(new AffinityTestKey(i, 1)).Id);
+        }
+
+        /// <summary>
+        /// Test affinity with binary flag.
+        /// </summary>
+        [Test]
+        public void TestAffinityBinary()
+        {
+            IIgnite g = Ignition.GetIgnite("grid-0");
+
+            ICacheAffinity aff = g.GetAffinity(null);  
+
+            IBinaryObject affKey = g.GetBinary().ToBinary<IBinaryObject>(new AffinityTestKey(0, 1));
+
+            IClusterNode node = aff.MapKeyToNode(affKey);
+
+            for (int i = 0; i < 10; i++)
+            {
+                IBinaryObject otherAffKey =
+                    g.GetBinary().ToBinary<IBinaryObject>(new AffinityTestKey(i, 1));
+
+                Assert.AreEqual(node.Id, aff.MapKeyToNode(otherAffKey).Id);
+            }
+        }
+
+        /// <summary>
+        /// Affinity key.
+        /// </summary>
+        public class AffinityTestKey
+        {
+            /** ID. */
+            private readonly int _id;
+
+            /** Affinity key. */
+            // ReSharper disable once NotAccessedField.Local
+            private readonly int _affKey;
+
+            /// <summary>
+            /// Constructor.
+            /// </summary>
+            /// <param name="id">ID.</param>
+            /// <param name="affKey">Affinity key.</param>
+            public AffinityTestKey(int id, int affKey)
+            {
+                _id = id;
+                _affKey = affKey;
+            }
+
+            /** <inheritdoc /> */
+            public override bool Equals(object obj)
+            {
+                var other = obj as AffinityTestKey;
+
+                return other != null && _id == other._id;
+            }
+
+            /** <inheritdoc /> */
+            public override int GetHashCode()
+            {
+                return _id;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs
deleted file mode 100644
index 4fb7738..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityFieldTest.cs
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// ReSharper disable UnusedAutoPropertyAccessor.Local
-namespace Apache.Ignite.Core.Tests.Cache
-{
-    using System;
-    using System.Collections.Generic;
-    using System.Linq;
-    using Apache.Ignite.Core.Binary;
-    using Apache.Ignite.Core.Cache;
-    using Apache.Ignite.Core.Cache.Affinity;
-    using Apache.Ignite.Core.Cache.Configuration;
-    using Apache.Ignite.Core.Tests.Compute;
-    using NUnit.Framework;
-
-    /// <summary>
-    /// Tests custom affinity mapping.
-    /// </summary>
-    public class CacheAffinityFieldTest
-    {
-        /** */
-        private ICache<object, string> _cache1;
-
-        /** */
-        private ICache<object, string> _cache2;
-
-        /// <summary>
-        /// Fixture set up.
-        /// </summary>
-        [TestFixtureSetUp]
-        public void FixtureSetUp()
-        {
-            var grid1 = Ignition.Start(GetConfig());
-            var grid2 = Ignition.Start(GetConfig("grid2"));
-
-            _cache1 = grid1.CreateCache<object, string>(new CacheConfiguration
-            {
-                CacheMode = CacheMode.Partitioned
-            });
-            _cache2 = grid2.GetCache<object, string>(null);
-        }
-
-        /// <summary>
-        /// Fixture tear down.
-        /// </summary>
-        [TestFixtureTearDown]
-        public void FixtureTearDown()
-        {
-            Ignition.StopAll(true);
-        }
-
-        /// <summary>
-        /// Tests the metadata.
-        /// </summary>
-        [Test]
-        public void TestMetadata()
-        {
-            // Put keys to update meta
-            _cache1.Put(new CacheKey(), string.Empty);
-            _cache1.Put(new CacheKeyAttr(), string.Empty);
-            _cache1.Put(new CacheKeyAttrOverride(), string.Empty);
-
-            // Verify
-            foreach (var type in new[] { typeof(CacheKey) , typeof(CacheKeyAttr), typeof(CacheKeyAttrOverride)})
-            {
-                Assert.AreEqual("AffinityKey", _cache1.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName);
-                Assert.AreEqual("AffinityKey", _cache2.Ignite.GetBinary().GetBinaryType(type).AffinityKeyFieldName);
-            }
-        }
-
-        /// <summary>
-        /// Tests that keys are located properly in cache partitions.
-        /// </summary>
-        [Test]
-        public void TestKeyLocation()
-        {
-            TestKeyLocation0((key, affKey) => new CacheKey {Key = key, AffinityKey = affKey});
-            TestKeyLocation0((key, affKey) => new CacheKeyAttr {Key = key, AffinityKey = affKey});
-            TestKeyLocation0((key, affKey) => new CacheKeyAttrOverride {Key = key, AffinityKey = affKey});
-        }
-
-        /// <summary>
-        /// Tests the <see cref="AffinityKey"/> class.
-        /// </summary>
-        [Test]
-        public void TestAffinityKeyClass()
-        {
-            // Check location
-            TestKeyLocation0((key, affKey) => new AffinityKey(key, affKey));
-
-            // Check meta
-            Assert.AreEqual("affKey",
-                _cache1.Ignite.GetBinary().GetBinaryType(typeof (AffinityKey)).AffinityKeyFieldName);
-        }
-
-        /// <summary>
-        /// Tests <see cref="AffinityKey"/> class interoperability.
-        /// </summary>
-        [Test]
-        public void TestInterop()
-        {
-            var affKey = _cache1.Ignite.GetCompute()
-                .ExecuteJavaTask<AffinityKey>(ComputeApiTest.EchoTask, ComputeApiTest.EchoTypeAffinityKey);
-
-            Assert.AreEqual("interopAffinityKey", affKey.Key);
-        }
-
-        /// <summary>
-        /// Tests the key location.
-        /// </summary>
-        private void TestKeyLocation0<T>(Func<int, int, T> ctor)
-        {
-            var aff = _cache1.Ignite.GetAffinity(_cache1.Name);
-
-            foreach (var cache in new[] { _cache1, _cache2 })
-            {
-                cache.RemoveAll();
-
-                var localNode = cache.Ignite.GetCluster().GetLocalNode();
-
-                var localKeys = Enumerable.Range(1, int.MaxValue)
-                    .Where(x => aff.MapKeyToNode(x).Id == localNode.Id).Take(100).ToArray();
-
-                for (int index = 0; index < localKeys.Length; index++)
-                {
-                    var cacheKey = ctor(index, localKeys[index]);
-
-                    cache.Put(cacheKey, index.ToString());
-
-                    // Verify that key is stored locally accroding to AffinityKeyFieldName
-                    Assert.AreEqual(index.ToString(), cache.LocalPeek(cacheKey, CachePeekMode.Primary));
-
-                    // Other cache does not have this key locally
-                    var otherCache = cache == _cache1 ? _cache2 : _cache1;
-                    Assert.Throws<KeyNotFoundException>(() => otherCache.LocalPeek(cacheKey, CachePeekMode.All));
-                }
-            }
-        }
-
-        /// <summary>
-        /// Gets the configuration.
-        /// </summary>
-        private static IgniteConfiguration GetConfig(string gridName = null)
-        {
-            return new IgniteConfiguration(TestUtils.GetTestConfiguration())
-            {
-                GridName = gridName,
-                BinaryConfiguration = new BinaryConfiguration
-                {
-                    TypeConfigurations = new[]
-                    {
-                        new BinaryTypeConfiguration(typeof (CacheKey))
-                        {
-                            AffinityKeyFieldName = "AffinityKey"
-                        },
-                        new BinaryTypeConfiguration(typeof(CacheKeyAttr)),
-                        new BinaryTypeConfiguration(typeof (CacheKeyAttrOverride))
-                        {
-                            AffinityKeyFieldName = "AffinityKey"
-                        }
-                    }
-                },
-            };
-        }
-
-        private class CacheKey
-        {
-            public int Key { get; set; }
-            public int AffinityKey { get; set; }
-        }
-
-        private class CacheKeyAttr
-        {
-            public int Key { get; set; }
-            [AffinityKeyMapped] public int AffinityKey { get; set; }
-        }
-
-        private class CacheKeyAttrOverride
-        {
-            [AffinityKeyMapped] public int Key { get; set; }
-            public int AffinityKey { get; set; }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs
deleted file mode 100644
index 689804c..0000000
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheAffinityTest.cs
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace Apache.Ignite.Core.Tests.Cache
-{
-    using Apache.Ignite.Core.Binary;
-    using Apache.Ignite.Core.Cache;
-    using Apache.Ignite.Core.Cluster;
-    using Apache.Ignite.Core.Impl;
-    using NUnit.Framework;
-
-    /// <summary>
-    /// Affinity key tests.
-    /// </summary>
-    public class CacheAffinityTest
-    {
-        /// <summary>
-        ///
-        /// </summary>
-        [TestFixtureSetUp]
-        public virtual void StartGrids()
-        {
-            TestUtils.KillProcesses();
-
-            IgniteConfiguration cfg = new IgniteConfiguration();
-
-            cfg.JvmClasspath = TestUtils.CreateTestClasspath();
-            cfg.JvmOptions = TestUtils.TestJavaOptions();
-            cfg.SpringConfigUrl = "config\\native-client-test-cache-affinity.xml";
-
-            for (int i = 0; i < 3; i++)
-            {
-                cfg.GridName = "grid-" + i;
-
-                Ignition.Start(cfg);
-            }
-        }
-
-        /// <summary>
-        /// Tear-down routine.
-        /// </summary>
-        [TestFixtureTearDown]
-        public virtual void StopGrids()
-        {
-            for (int i = 0; i < 3; i++)
-                Ignition.Stop("grid-" + i, true);
-        }
-
-        /// <summary>
-        /// Test affinity key.
-        /// </summary>
-        [Test]
-        public void TestAffinity()
-        {
-            IIgnite g = Ignition.GetIgnite("grid-0");
-
-            ICacheAffinity aff = g.GetAffinity(null);
-
-            IClusterNode node = aff.MapKeyToNode(new AffinityTestKey(0, 1));
-
-            for (int i = 0; i < 10; i++)
-                Assert.AreEqual(node.Id, aff.MapKeyToNode(new AffinityTestKey(i, 1)).Id);
-        }
-
-        /// <summary>
-        /// Test affinity with binary flag.
-        /// </summary>
-        [Test]
-        public void TestAffinityBinary()
-        {
-            IIgnite g = Ignition.GetIgnite("grid-0");
-
-            ICacheAffinity aff = g.GetAffinity(null);  
-
-            IBinaryObject affKey = g.GetBinary().ToBinary<IBinaryObject>(new AffinityTestKey(0, 1));
-
-            IClusterNode node = aff.MapKeyToNode(affKey);
-
-            for (int i = 0; i < 10; i++)
-            {
-                IBinaryObject otherAffKey =
-                    g.GetBinary().ToBinary<IBinaryObject>(new AffinityTestKey(i, 1));
-
-                Assert.AreEqual(node.Id, aff.MapKeyToNode(otherAffKey).Id);
-            }
-        }
-
-        /// <summary>
-        /// Affinity key.
-        /// </summary>
-        public class AffinityTestKey
-        {
-            /** ID. */
-            private int _id;
-
-            /** Affinity key. */
-            private int _affKey;
-
-            /// <summary>
-            /// Constructor.
-            /// </summary>
-            /// <param name="id">ID.</param>
-            /// <param name="affKey">Affinity key.</param>
-            public AffinityTestKey(int id, int affKey)
-            {
-                _id = id;
-                _affKey = affKey;
-            }
-
-            /** <inheritdoc /> */
-            public override bool Equals(object obj)
-            {
-                AffinityTestKey other = obj as AffinityTestKey;
-
-                return other != null && _id == other._id;
-            }
-
-            /** <inheritdoc /> */
-            public override int GetHashCode()
-            {
-                return _id;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
index eb73abe..da68ca2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheConfigurationTest.cs
@@ -271,7 +271,7 @@ namespace Apache.Ignite.Core.Tests.Cache
             var py = (AffinityFunctionBase) y;
 
             Assert.AreEqual(px.GetType(), py.GetType());
-            Assert.AreEqual(px.PartitionCount, py.PartitionCount);
+            Assert.AreEqual(px.Partitions, py.Partitions);
             Assert.AreEqual(px.ExcludeNeighbors, py.ExcludeNeighbors);
         }
 
@@ -552,7 +552,7 @@ namespace Apache.Ignite.Core.Tests.Cache
                 },
                 AffinityFunction = new RendezvousAffinityFunction
                 {
-                    PartitionCount = 513,
+                    Partitions = 513,
                     ExcludeNeighbors = true
                 }
             };
@@ -645,7 +645,7 @@ namespace Apache.Ignite.Core.Tests.Cache
                 },
                 AffinityFunction = new FairAffinityFunction
                 {
-                    PartitionCount = 113,
+                    Partitions = 113,
                     ExcludeNeighbors = false
                 }
             };

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml
index 6fe3e70..9c7bfb0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/native-client-test-cache-affinity.xml
@@ -34,7 +34,7 @@
                             <list>
                                 <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetBinaryTypeConfiguration">
                                     <property name="typeName"
-                                              value="Apache.Ignite.Core.Tests.Cache.CacheAffinityTest+AffinityTestKey"/>
+                                              value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityTest+AffinityTestKey"/>
                                     <property name="affinityKeyFieldName" value="_affKey"/>
                                 </bean>
                             </list>

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
index 3056273..e435cf6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
@@ -97,7 +97,7 @@ namespace Apache.Ignite.Core.Tests
                                     <nearConfiguration nearStartSize='7'>
                                         <evictionPolicy type='FifoEvictionPolicy' batchSize='10' maxSize='20' maxMemorySize='30' />
                                     </nearConfiguration>
-                                    <affinityFunction type='RendezvousAffinityFunction' partitionCount='99' excludeNeighbors='true' />
+                                    <affinityFunction type='RendezvousAffinityFunction' partitions='99' excludeNeighbors='true' />
                                 </cacheConfiguration>
                                 <cacheConfiguration name='secondCache' />
                             </cacheConfiguration>
@@ -172,7 +172,7 @@ namespace Apache.Ignite.Core.Tests
 
             var af = cacheCfg.AffinityFunction as RendezvousAffinityFunction;
             Assert.IsNotNull(af);
-            Assert.AreEqual(99, af.PartitionCount);
+            Assert.AreEqual(99, af.Partitions);
             Assert.IsTrue(af.ExcludeNeighbors);
 
             Assert.AreEqual(new Dictionary<string, object> {{"myNode", "true"}}, cfg.UserAttributes);
@@ -365,6 +365,14 @@ namespace Apache.Ignite.Core.Tests
                             IdMapper = new IdMapper(),
                             NameMapper = new NameMapper(),
                             Serializer = new TestSerializer()
+                        },
+                        new BinaryTypeConfiguration
+                        {
+                            IsEnum = false,
+                            KeepDeserialized = false,
+                            AffinityKeyFieldName = "affKeyFieldName",
+                            TypeName = "typeName2",
+                            Serializer = new BinaryReflectiveSerializer()
                         }
                     },
                     Types = new[] {typeof (string).FullName},
@@ -448,7 +456,7 @@ namespace Apache.Ignite.Core.Tests
                         AffinityFunction = new FairAffinityFunction
                         {
                             ExcludeNeighbors = true,
-                            PartitionCount = 48
+                            Partitions = 48
                         }
                     }
                 },

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
index c6274ff..726fa3b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs
@@ -22,6 +22,8 @@ namespace Apache.Ignite.Core.Tests
     using System.Linq;
     using System.Reflection;
     using Apache.Ignite.Core.Tests.Binary;
+    using Apache.Ignite.Core.Tests.Cache.Affinity;
+    using Apache.Ignite.Core.Tests.Cache.Query;
     using Apache.Ignite.Core.Tests.Memory;
     using NUnit.ConsoleRunner;
 
@@ -46,9 +48,9 @@ namespace Apache.Ignite.Core.Tests
                 return;
             }
 
-            TestOne(typeof(BinaryStringTest), "Test");
+            //TestOne(typeof(BinaryStringTest), "Test");
 
-            //TestAll(typeof (CacheQueriesCodeConfigurationTest));
+            TestAll(typeof (AffinityFunctionTest));
             //TestAllInAssembly();
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index 46dbd94..e7f772f 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -84,6 +84,8 @@
   </ItemGroup>
   <ItemGroup>
     <Compile Include="Binary\BinaryReflectiveSerializer.cs" />
+    <Compile Include="Cache\Affinity\AffinityTopologyVersion.cs" />
+    <Compile Include="Cache\Affinity\AffinityFunctionContext.cs" />
     <Compile Include="Impl\Binary\BinaryReflectiveSerializerInternal.cs" />
     <Compile Include="Impl\Binary\IBinarySerializerInternal.cs" />
     <Compile Include="Binary\Package-Info.cs" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
index ea5b21c..9b89780 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionBase.cs
@@ -18,10 +18,13 @@
 namespace Apache.Ignite.Core.Cache.Affinity
 {
     using System;
+    using System.Collections.Generic;
     using System.ComponentModel;
     using Apache.Ignite.Core.Binary;
     using Apache.Ignite.Core.Cache.Affinity.Fair;
     using Apache.Ignite.Core.Cache.Affinity.Rendezvous;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Common;
 
     /// <summary>
     /// Base class for predefined affinity functions.
@@ -37,14 +40,67 @@ namespace Apache.Ignite.Core.Cache.Affinity
         /** */
         private const byte TypeCodeRendezvous = 2;
 
-        /// <summary> The default value for <see cref="PartitionCount"/> property. </summary>
-        public const int DefaultPartitionCount = 1024;
+        /** */
+        private const byte TypeCodeUser = 3;
+
+        /// <summary> The default value for <see cref="Partitions"/> property. </summary>
+        public const int DefaultPartitions = 1024;
 
         /// <summary>
         /// Gets or sets the total number of partitions.
         /// </summary>
-        [DefaultValue(DefaultPartitionCount)]
-        public int PartitionCount { get; set; }
+        [DefaultValue(DefaultPartitions)]
+        public int Partitions { get; set; }
+
+        /// <summary>
+        /// Gets partition number for a given key starting from 0. Partitioned caches
+        /// should make sure that keys are about evenly distributed across all partitions
+        /// from 0 to <see cref="Partitions" /> for best performance.
+        /// <para />
+        /// Note that for fully replicated caches it is possible to segment key sets among different
+        /// grid node groups. In that case each node group should return a unique partition
+        /// number. However, unlike partitioned cache, mappings of keys to nodes in
+        /// replicated caches are constant and a node cannot migrate from one partition
+        /// to another.
+        /// </summary>
+        /// <param name="key">Key to get partition for.</param>
+        /// <returns>
+        /// Partition number for a given key.
+        /// </returns>
+        public int GetPartition(object key)
+        {
+            throw GetDirectUsageError();
+        }
+
+        /// <summary>
+        /// Removes node from affinity. This method is called when it is safe to remove
+        /// disconnected node from affinity mapping.
+        /// </summary>
+        /// <param name="nodeId">The node identifier.</param>
+        public void RemoveNode(Guid nodeId)
+        {
+            throw GetDirectUsageError();
+        }
+
+        /// <summary>
+        /// Gets affinity nodes for a partition. In case of replicated cache, all returned
+        /// nodes are updated in the same manner. In case of partitioned cache, the returned
+        /// list should contain only the primary and back up nodes with primary node being
+        /// always first.
+        /// <pare />
+        /// Note that partitioned affinity must obey the following contract: given that node
+        /// <code>N</code> is primary for some key <code>K</code>, if any other node(s) leave
+        /// grid and no node joins grid, node <code>N</code> will remain primary for key <code>K</code>.
+        /// </summary>
+        /// <param name="context">The affinity function context.</param>
+        /// <returns>
+        /// A collection of partitions, where each partition is a collection of nodes,
+        /// where first node is a primary node, and other nodes are backup nodes.
+        /// </returns>
+        public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context)
+        {
+            throw GetDirectUsageError();
+        }
 
         /// <summary>
         /// Gets or sets a value indicating whether to exclude same-host-neighbors from being backups of each other.
@@ -56,7 +112,7 @@ namespace Apache.Ignite.Core.Cache.Affinity
         /// </summary>
         internal AffinityFunctionBase()
         {
-            PartitionCount = DefaultPartitionCount;
+            Partitions = DefaultPartitions;
         }
 
         /// <summary>
@@ -77,11 +133,16 @@ namespace Apache.Ignite.Core.Cache.Affinity
                 case TypeCodeRendezvous:
                     fun = new RendezvousAffinityFunction();
                     break;
+                case TypeCodeUser:
+                    var f = reader.ReadObject<IAffinityFunction>();
+                    reader.ReadInt(); // skip partition count
+
+                    return f;
                 default:
                     throw new InvalidOperationException("Invalid AffinityFunction type code: " + typeCode);
             }
 
-            fun.PartitionCount = reader.ReadInt();
+            fun.Partitions = reader.ReadInt();
             fun.ExcludeNeighbors = reader.ReadBoolean();
 
             return fun;
@@ -100,17 +161,30 @@ namespace Apache.Ignite.Core.Cache.Affinity
 
             var p = fun as AffinityFunctionBase;
 
-            if (p == null)
+            if (p != null)
             {
-                throw new NotSupportedException(
-                    string.Format("Unsupported AffinityFunction: {0}. Only predefined affinity function types " +
-                                  "are supported: {1}, {2}", fun.GetType(), typeof(FairAffinityFunction),
-                        typeof(RendezvousAffinityFunction)));
+                writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous);
+                writer.WriteInt(p.Partitions);
+                writer.WriteBoolean(p.ExcludeNeighbors);
             }
+            else
+            {
+                writer.WriteByte(TypeCodeUser);
+
+                if (!fun.GetType().IsSerializable)
+                    throw new IgniteException("AffinityFunction should be serializable.");
 
-            writer.WriteByte(p is FairAffinityFunction ? TypeCodeFair : TypeCodeRendezvous);
-            writer.WriteInt(p.PartitionCount);
-            writer.WriteBoolean(p.ExcludeNeighbors);
+                writer.WriteObject(fun);
+                writer.WriteInt(fun.Partitions);  // partition count is written once and can not be changed.
+            }
+        }
+
+        /// <summary>
+        /// Gets the direct usage error.
+        /// </summary>
+        private Exception GetDirectUsageError()
+        {
+            return new IgniteException(GetType() + " can not be used directly.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e1c755c7/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
new file mode 100644
index 0000000..1f44d8c
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Cache.Affinity
+{
+    using System.Collections.Generic;
+    using Apache.Ignite.Core.Binary;
+    using Apache.Ignite.Core.Cluster;
+    using Apache.Ignite.Core.Events;
+    using Apache.Ignite.Core.Impl;
+
+    /// <summary>
+    /// Affinity function context.
+    /// </summary>
+    public class AffinityFunctionContext
+    {
+        /** */
+        private readonly List<List<IClusterNode>> _previousAssignment;
+
+        /** */
+        private readonly int _backups;
+
+        /** */
+        private readonly ICollection<IClusterNode> _currentTopologySnapshot;
+
+        /** */
+        private readonly AffinityTopologyVersion _currentTopologyVersion;
+
+        /** */
+        private readonly DiscoveryEvent _discoveryEvent;
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="AffinityFunctionContext"/> class.
+        /// </summary>
+        /// <param name="reader">The reader.</param>
+        internal AffinityFunctionContext(IBinaryRawReader reader)
+        {
+            var cnt = reader.ReadInt();
+
+            if (cnt > 0)
+            {
+                _previousAssignment = new List<List<IClusterNode>>(cnt);
+
+                for (var i = 0; i < cnt; i++)
+                    _previousAssignment.Add(IgniteUtils.ReadNodes(reader));
+            }
+
+            _backups = reader.ReadInt();
+            _currentTopologySnapshot = IgniteUtils.ReadNodes(reader);
+            _currentTopologyVersion = new AffinityTopologyVersion(reader.ReadLong(), reader.ReadInt());
+            _discoveryEvent = EventReader.Read<DiscoveryEvent>(reader);
+        }
+
+        /// <summary>
+        /// Gets the affinity assignment for given partition on previous topology version.
+        /// First node in returned list is a primary node, other nodes are backups.
+        /// </summary>
+        /// <param name="partition">The partition to get previous assignment for.</param>
+        /// <returns>
+        /// List of nodes assigned to a given partition on previous topology version or <code>null</code>
+        /// if this information is not available.
+        /// </returns>
+        public ICollection<IClusterNode> GetPreviousAssignment(int partition)
+        {
+            return _previousAssignment == null ? null : _previousAssignment[partition];
+        }
+
+        /// <summary>
+        /// Gets number of backups for new assignment.
+        /// </summary>
+        public int Backups
+        {
+            get { return _backups; }
+        }
+
+        /// <summary>
+        /// Gets the current topology snapshot. Snapshot will contain only nodes on which the particular
+        /// cache is configured. List of passed nodes is guaranteed to be sorted in a same order
+        /// on all nodes on which partition assignment is performed.
+        /// </summary>
+        public ICollection<IClusterNode> CurrentTopologySnapshot
+        {
+            get { return _currentTopologySnapshot; }
+        }
+
+        /// <summary>
+        /// Gets the current topology version.
+        /// </summary>
+        public AffinityTopologyVersion CurrentTopologyVersion
+        {
+            get { return _currentTopologyVersion; }
+        }
+
+        /// <summary>
+        /// Gets the discovery event that caused the topology change.
+        /// </summary>
+        public DiscoveryEvent DiscoveryEvent
+        {
+            get { return _discoveryEvent; }
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message