ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [02/11] ignite git commit: IGNITE-1364: Squashed commit from initial branch.
Date Thu, 03 Sep 2015 17:15:51 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp
new file mode 100644
index 0000000..7d89321
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/cache/query/query_impl.cpp
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/cache/query/query_impl.h"
+
+using namespace ignite::common::concurrent;
+using namespace ignite::common::java;
+using namespace ignite::impl::interop;
+using namespace ignite::impl::portable;
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace cache
+        {
+            namespace query
+            {
+                /** Operation: get all entries. */
+                const int32_t OP_GET_ALL = 1;
+
+                /** Operation: get single entry. */
+                const int32_t OP_GET_SINGLE = 3;
+
+                QueryCursorImpl::QueryCursorImpl(SharedPointer<IgniteEnvironment> env, jobject javaRef) :
+                    env(env), javaRef(javaRef), iterCalled(false), getAllCalled(false), hasNext(false)
+                {
+                    // No-op.
+                }
+
+                QueryCursorImpl::~QueryCursorImpl()
+                {
+                    // 1. Close the cursor.
+                    env.Get()->Context()->QueryCursorClose(javaRef);
+
+                    // 2. Release Java reference.
+                    JniContext::Release(javaRef);
+                }
+
+                bool QueryCursorImpl::HasNext(IgniteError* err)
+                {
+                    // Check whether GetAll() was called earlier.
+                    if (getAllCalled) 
+                    {
+                        *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, 
+                            "Cannot use HasNext() method because GetAll() was called.");
+
+                        return false;
+                    }
+
+                    // Create iterator in Java if needed.
+                    if (!CreateIteratorIfNeeded(err))
+                        return false;
+                    
+                    return hasNext;
+                }
+
+                void QueryCursorImpl::GetNext(OutputOperation& op, IgniteError* err)
+                {
+                    // Check whether GetAll() was called earlier.
+                    if (getAllCalled) 
+                    {
+                        *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, 
+                            "Cannot use GetNext() method because GetAll() was called.");
+
+                        return;
+                    }
+
+                    // Create iterator in Java if needed.
+                    if (!CreateIteratorIfNeeded(err))
+                        return;
+
+                    if (hasNext)
+                    {
+                        JniErrorInfo jniErr;
+
+                        SharedPointer<InteropMemory> inMem = env.Get()->AllocateMemory();
+
+                        env.Get()->Context()->TargetOutStream(javaRef, OP_GET_SINGLE, inMem.Get()->PointerLong(), &jniErr);
+
+                        IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+                        if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+                        {
+                            InteropInputStream in(inMem.Get());
+
+                            portable::PortableReaderImpl reader(&in);
+
+                            op.ProcessOutput(reader);
+
+                            hasNext = IteratorHasNext(err);
+                        }
+                    }
+                    else
+                    {
+                        // Ensure we do not overwrite possible previous error.
+                        if (err->GetCode() == IgniteError::IGNITE_SUCCESS)
+                            *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, "No more elements available.");
+                    }
+                }
+
+                void QueryCursorImpl::GetAll(OutputOperation& op, IgniteError* err)
+                {
+                    // Check whether any of iterator methods were called.
+                    if (iterCalled)
+                    {
+                        *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+                            "Cannot use GetAll() method because an iteration method was called.");
+
+                        return;
+                    }
+
+                    // Check whether GetAll was called before.
+                    if (getAllCalled)
+                    {
+                        *err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+                            "Cannot use GetNext() method because GetAll() was called.");
+
+                        return;
+                    }
+
+                    // Get data.
+                    JniErrorInfo jniErr;
+
+                    SharedPointer<InteropMemory> inMem = env.Get()->AllocateMemory();
+
+                    env.Get()->Context()->TargetOutStream(javaRef, OP_GET_ALL, inMem.Get()->PointerLong(), &jniErr);
+
+                    IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+                    if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+                    {
+                        getAllCalled = true;
+
+                        InteropInputStream in(inMem.Get());
+
+                        portable::PortableReaderImpl reader(&in);
+
+                        op.ProcessOutput(reader);
+                    }
+                }
+
+                bool QueryCursorImpl::CreateIteratorIfNeeded(IgniteError* err)
+                {
+                    if (!iterCalled)
+                    {
+                        JniErrorInfo jniErr;
+
+                        env.Get()->Context()->QueryCursorIterator(javaRef, &jniErr);
+
+                        IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+                        if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+                        {
+                            iterCalled = true;
+
+                            hasNext = IteratorHasNext(err);
+                        }
+                        else
+                            return false;
+                    }
+                    
+                    return true;
+                }
+
+                bool QueryCursorImpl::IteratorHasNext(IgniteError* err)
+                {
+                    JniErrorInfo jniErr;
+
+                    bool res = env.Get()->Context()->QueryCursorIteratorHasNext(javaRef, &jniErr);
+
+                    IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+                    return jniErr.code == IGNITE_JNI_ERR_SUCCESS && res;
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp b/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp
new file mode 100644
index 0000000..c447faa
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/handle_registry.cpp
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/handle_registry.h"
+
+using namespace ignite::common::concurrent;
+
+namespace ignite
+{
+    namespace impl
+    {
+        HandleRegistryEntry::~HandleRegistryEntry()
+        {
+            // No-op.
+        }
+
+        HandleRegistrySegment::HandleRegistrySegment() : 
+            map(new std::map<int64_t, SharedPointer<HandleRegistryEntry>>()), mux(new CriticalSection())
+        {
+            // No-op.
+        }
+
+        HandleRegistrySegment::~HandleRegistrySegment()
+        {
+            delete map;
+            delete mux;
+        }
+
+        SharedPointer<HandleRegistryEntry> HandleRegistrySegment::Get(int64_t hnd)
+        {
+            mux->Enter();
+
+            SharedPointer<HandleRegistryEntry> res = (*map)[hnd];
+
+            mux->Leave();
+
+            return res;
+        }
+
+        void HandleRegistrySegment::Put(int64_t hnd, const SharedPointer<HandleRegistryEntry>& entry)
+        {
+            mux->Enter();
+
+            (*map)[hnd] = entry;
+
+            mux->Leave();
+        }
+
+        void HandleRegistrySegment::Remove(int64_t hnd)
+        {
+            mux->Enter();
+
+            map->erase(hnd);
+
+            mux->Leave();
+        }
+
+        void HandleRegistrySegment::Clear()
+        {
+            mux->Enter();
+
+            map->erase(map->begin(), map->end());
+
+            mux->Leave();
+        }
+
+        HandleRegistry::HandleRegistry(int32_t fastCap, int32_t slowSegmentCnt)
+        {
+            this->fastCap = fastCap;
+
+            fastCtr = 0;
+
+            fast = new SharedPointer<HandleRegistryEntry>[fastCap];
+
+            for (int i = 0; i < fastCap; i++)
+                fast[i] = SharedPointer<HandleRegistryEntry>();
+
+            this->slowSegmentCnt = slowSegmentCnt;
+
+            slowCtr = fastCap;
+
+            slow = new HandleRegistrySegment*[slowSegmentCnt];
+
+            for (int i = 0; i < slowSegmentCnt; i++)
+                slow[i] = new HandleRegistrySegment();
+
+            closed = 0;
+
+            Memory::Fence();
+        }
+
+        HandleRegistry::~HandleRegistry()
+        {
+            Close();
+
+            delete[] fast;
+
+            for (int i = 0; i < slowSegmentCnt; i++)
+                delete slow[i];
+
+            delete[] slow;
+        }
+
+        int64_t HandleRegistry::Allocate(const SharedPointer<HandleRegistryEntry>& target)
+        {
+            return Allocate0(target, false, false);
+        }
+
+        int64_t HandleRegistry::AllocateCritical(const SharedPointer<HandleRegistryEntry>& target)
+        {
+            return Allocate0(target, true, false);
+        }
+
+        int64_t HandleRegistry::AllocateSafe(const SharedPointer<HandleRegistryEntry>& target)
+        {
+            return Allocate0(target, false, true);
+        }
+
+        int64_t HandleRegistry::AllocateCriticalSafe(const SharedPointer<HandleRegistryEntry>& target)
+        {
+            return Allocate0(target, true, true);
+        }
+
+        void HandleRegistry::Release(int64_t hnd)
+        {
+            if (hnd < fastCap)
+                fast[static_cast<int32_t>(hnd)] = SharedPointer<HandleRegistryEntry>();
+            else
+            {
+                HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt);
+
+                segment->Remove(hnd);
+            }
+
+            Memory::Fence();
+        }
+
+        SharedPointer<HandleRegistryEntry> HandleRegistry::Get(int64_t hnd)
+        {
+            Memory::Fence();
+
+            if (hnd < fastCap)
+                return fast[static_cast<int32_t>(hnd)];
+            else
+            {
+                HandleRegistrySegment* segment = *(slow + hnd % slowSegmentCnt);
+
+                return segment->Get(hnd);
+            }
+        }
+
+        void HandleRegistry::Close()
+        {
+            if (Atomics::CompareAndSet32(&closed, 0, 1))
+            {
+                // Cleanup fast-path handles.
+                for (int i = 0; i < fastCap; i++)
+                    fast[i] = SharedPointer<HandleRegistryEntry>();
+
+                // Cleanup slow-path handles.
+                for (int i = 0; i < slowSegmentCnt; i++)
+                    (*(slow + i))->Clear();
+            }
+        }
+
+        int64_t HandleRegistry::Allocate0(const SharedPointer<HandleRegistryEntry>& target, bool critical, bool safe)
+        {
+            // Check closed state.
+            Memory::Fence();
+
+            if (closed == 1)
+                return -1;
+
+            // Try allocating entry on critical path.
+            if (critical)
+            {
+                if (fastCtr < fastCap)
+                {
+                    int32_t fastIdx = Atomics::IncrementAndGet32(&fastCtr) - 1;
+
+                    if (fastIdx < fastCap)
+                    {
+                        fast[fastIdx] = target;
+
+                        // Double-check for closed state if safe mode is on.
+                        Memory::Fence();
+
+                        if (safe && closed == 1)
+                        {
+                            fast[fastIdx] = SharedPointer<HandleRegistryEntry>();
+
+                            return -1;
+                        }
+                        else
+                            return fastIdx;
+                    }
+                }
+            }
+
+            // Either allocating on slow-path, or fast-path can no longer accomodate more entries.
+            int64_t slowIdx = Atomics::IncrementAndGet64(&slowCtr) - 1;
+
+            HandleRegistrySegment* segment = *(slow + slowIdx % slowSegmentCnt);
+
+            segment->Put(slowIdx, target);
+
+            // Double-check for closed state if safe mode is on.
+            Memory::Fence();
+
+            if (safe && closed == 1)
+            {
+                segment->Remove(slowIdx);
+
+                return -1;
+            }
+
+            return slowIdx;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/ignite_environment.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/ignite_environment.cpp b/modules/platform/src/main/cpp/core/src/impl/ignite_environment.cpp
new file mode 100644
index 0000000..b20c543
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/ignite_environment.cpp
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/portable/portable_reader_impl.h"
+#include "ignite/impl/ignite_environment.h"
+#include "ignite/portable/portable.h"
+
+using namespace ignite::common::concurrent;
+using namespace ignite::common::java;
+using namespace ignite::impl::interop;
+using namespace ignite::impl::portable;
+using namespace ignite::portable;
+
+namespace ignite 
+{
+    namespace impl 
+    {
+        /**
+         * OnStart callback.
+         *
+         * @param target Target environment.
+         * @param memPtr Memory pointer.
+         */
+        void IGNITE_CALL OnStart(void* target, long long memPtr)
+        {
+            SharedPointer<IgniteEnvironment>* ptr = static_cast<SharedPointer<IgniteEnvironment>*>(target);
+
+            ptr->Get()->OnStartCallback(memPtr);
+        }
+
+        /**
+         * OnStop callback.
+         *
+         * @param target Target environment.
+         */
+        void IGNITE_CALL OnStop(void* target)
+        {
+            SharedPointer<IgniteEnvironment>* ptr = static_cast<SharedPointer<IgniteEnvironment>*>(target);
+
+            delete ptr;
+        } 
+
+        IgniteEnvironment::IgniteEnvironment() : ctx(SharedPointer<JniContext>()), latch(new SingleLatch), name(NULL),
+            metaMgr(new PortableMetadataManager())
+        {
+            // No-op.
+        }
+
+        IgniteEnvironment::~IgniteEnvironment()
+        {
+            delete latch;
+
+            if (name)
+                delete name;
+
+            delete metaMgr;
+        }
+
+        JniHandlers IgniteEnvironment::GetJniHandlers(SharedPointer<IgniteEnvironment>* target)
+        {
+            JniHandlers hnds = JniHandlers();
+
+            hnds.target = target;
+
+            hnds.onStart = OnStart;
+            hnds.onStop = OnStop;
+
+            hnds.error = NULL;
+
+            return hnds;
+        }
+            
+        void IgniteEnvironment::Initialize(SharedPointer<JniContext> ctx)
+        {
+            this->ctx = ctx;
+                
+            latch->CountDown();
+        }
+        
+        char* IgniteEnvironment::InstanceName()
+        {
+            return name;
+        }
+
+        JniContext* IgniteEnvironment::Context()
+        {
+            return ctx.Get();
+        }
+
+        SharedPointer<InteropMemory> IgniteEnvironment::AllocateMemory()
+        {
+            SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(1024));
+
+            return ptr;
+        }
+
+        SharedPointer<InteropMemory> IgniteEnvironment::AllocateMemory(int32_t cap)
+        {
+            SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(cap));
+
+            return ptr;
+        }
+
+        SharedPointer<InteropMemory> IgniteEnvironment::GetMemory(int64_t memPtr)
+        {
+            int8_t* memPtr0 = reinterpret_cast<int8_t*>(memPtr);
+
+            int32_t flags = InteropMemory::Flags(memPtr0);
+
+            if (InteropMemory::IsExternal(flags))
+            {
+                SharedPointer<InteropMemory> ptr(new InteropExternalMemory(memPtr0));
+
+                return ptr;
+            }
+            else
+            {
+                SharedPointer<InteropMemory> ptr(new InteropUnpooledMemory(memPtr0));
+
+                return ptr;
+            }
+        }
+
+        PortableMetadataManager* IgniteEnvironment::GetMetadataManager()
+        {
+            return metaMgr;
+        }
+
+        void IgniteEnvironment::OnStartCallback(long long memPtr)
+        {
+            InteropExternalMemory mem(reinterpret_cast<int8_t*>(memPtr));
+            InteropInputStream stream(&mem);
+
+            PortableReaderImpl reader(&stream);
+            
+            int32_t nameLen = reader.ReadString(NULL, 0);
+
+            if (nameLen >= 0)
+            {
+                name = new char[nameLen + 1];
+                reader.ReadString(name, nameLen + 1);
+            }
+            else
+                name = NULL;
+        }
+    }
+}
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/ignite_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/ignite_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/ignite_impl.cpp
new file mode 100644
index 0000000..1aad309
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/ignite_impl.cpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/ignite_impl.h"
+
+using namespace ignite::common::concurrent;
+using namespace ignite::common::java;
+
+namespace ignite
+{    
+    namespace impl
+    {
+        IgniteImpl::IgniteImpl(SharedPointer<IgniteEnvironment> env, jobject javaRef) : env(env), javaRef(javaRef)
+        {
+            // No-op.
+        }
+
+        IgniteImpl::~IgniteImpl()
+        {
+            JniContext::Release(javaRef);
+        }
+
+        char* IgniteImpl::GetName()
+        {
+            return env.Get()->InstanceName();
+        }
+    }    
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp b/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp
new file mode 100644
index 0000000..72340ee
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/interop/interop_input_stream.cpp
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstring>
+
+#include "ignite/impl/interop/interop_input_stream.h"
+#include "ignite/ignite_error.h"
+
+/**
+ * Common macro to read a single value.
+ */
+#define IGNITE_INTEROP_IN_READ(type, len) { \
+    EnsureEnoughData(len); \
+    type res = *reinterpret_cast<type*>(data + pos); \
+    Shift(len); \
+    return res; \
+}
+
+/**
+ * Common macro to read an array.
+ */
+#define IGNITE_INTEROP_IN_READ_ARRAY(len, shift) { \
+    CopyAndShift(reinterpret_cast<int8_t*>(res), 0, len << shift); \
+}
+
+namespace ignite
+{    
+    namespace impl
+    {
+        namespace interop 
+        {
+            union PortableInt32Float
+            {
+                int32_t i;
+                float f;
+            };
+
+            union PortableInt64Double
+            {
+                int64_t i;
+                double d;
+            };
+
+            InteropInputStream::InteropInputStream(InteropMemory* mem)
+            {
+                this->mem = mem;
+
+                data = mem->Data();
+                len = mem->Length();
+                pos = 0;
+            }
+
+            int8_t InteropInputStream::ReadInt8()
+            {
+                IGNITE_INTEROP_IN_READ(int8_t, 1);
+            }
+
+            void InteropInputStream::ReadInt8Array(int8_t* const res, const int32_t len)
+            {
+                IGNITE_INTEROP_IN_READ_ARRAY(len, 0);
+            }
+
+            bool InteropInputStream::ReadBool()
+            {
+                return ReadInt8() == 1;
+            }
+
+            void InteropInputStream::ReadBoolArray(bool* const res, const int32_t len)
+            {
+                for (int i = 0; i < len; i++)
+                    *(res + i) = ReadBool();
+            }
+                
+            int16_t InteropInputStream::ReadInt16()
+            {
+                IGNITE_INTEROP_IN_READ(int16_t, 2);
+            }
+
+            void InteropInputStream::ReadInt16Array(int16_t* const res, const int32_t len)
+            {
+                IGNITE_INTEROP_IN_READ_ARRAY(len, 1);
+            }
+
+            uint16_t InteropInputStream::ReadUInt16()
+            {
+                IGNITE_INTEROP_IN_READ(uint16_t, 2);
+            }
+
+            void InteropInputStream::ReadUInt16Array(uint16_t* const res, const int32_t len)
+            {
+                IGNITE_INTEROP_IN_READ_ARRAY(len, 1);
+            }
+
+            int32_t InteropInputStream::ReadInt32()
+            {
+                IGNITE_INTEROP_IN_READ(int32_t, 4);
+            }
+
+            int32_t InteropInputStream::ReadInt32(int32_t pos)
+            {
+                int delta = pos + 4 - this->pos;
+
+                if (delta > 0)
+                    EnsureEnoughData(delta);
+
+                return *reinterpret_cast<int32_t*>(data + pos);
+            }
+
+            void InteropInputStream::ReadInt32Array(int32_t* const res, const int32_t len)
+            {
+                IGNITE_INTEROP_IN_READ_ARRAY(len, 2);
+            }
+
+            int64_t InteropInputStream::ReadInt64()
+            {
+                IGNITE_INTEROP_IN_READ(int64_t, 8);
+            }
+
+            void InteropInputStream::ReadInt64Array(int64_t* const res, const int32_t len)
+            {
+                IGNITE_INTEROP_IN_READ_ARRAY(len, 3);
+            }
+
+            float InteropInputStream::ReadFloat()
+            {
+                PortableInt32Float u;
+
+                u.i = ReadInt32();
+
+                return u.f;
+            }
+
+            void InteropInputStream::ReadFloatArray(float* const res, const int32_t len)
+            {
+                IGNITE_INTEROP_IN_READ_ARRAY(len, 2);
+            }
+
+            double InteropInputStream::ReadDouble()
+            {
+                PortableInt64Double u;
+
+                u.i = ReadInt64();
+
+                return u.d;
+            }
+
+            void InteropInputStream::ReadDoubleArray(double* const res, const int32_t len)
+            {
+                IGNITE_INTEROP_IN_READ_ARRAY(len, 3);
+            }
+                
+            int32_t InteropInputStream::Remaining()
+            {
+                return len - pos;
+            }
+
+            int32_t InteropInputStream::Position()
+            {
+                return pos;
+            }
+
+            void InteropInputStream::Position(int32_t pos)
+            {
+                if (pos > len) {
+                    IGNITE_ERROR_FORMATTED_3(IgniteError::IGNITE_ERR_MEMORY, "Requested input stream position is out of bounds",
+                        "memPtr", mem->PointerLong(), "len", len, "pos", pos);
+                }
+
+                this->pos = pos;
+            }
+
+            void InteropInputStream::Synchronize()
+            {
+                data = mem->Data();
+                len = mem->Length();
+            }
+            
+            void InteropInputStream::EnsureEnoughData(int32_t cnt)
+            {
+                if (len - pos < cnt) {
+                    IGNITE_ERROR_FORMATTED_4(IgniteError::IGNITE_ERR_MEMORY, "Not enough data in the stream",
+                        "memPtr", mem->PointerLong(), "len", len, "pos", pos, "requested", cnt);
+                }
+            }
+
+            void InteropInputStream::CopyAndShift(int8_t* dest, int32_t off, int32_t cnt)
+            {
+                EnsureEnoughData(cnt);
+
+                memcpy(dest + off, data + pos, cnt);
+
+                Shift(cnt);
+            }
+
+            void InteropInputStream::Shift(int32_t cnt)
+            {
+                pos += cnt;
+            }
+        }
+    }
+}        

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp b/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp
new file mode 100644
index 0000000..05ba8b6
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/interop/interop_memory.cpp
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/common/java.h>
+
+#include "ignite/impl/interop/interop_memory.h"
+#include "ignite/ignite_error.h"
+
+using namespace ignite::common::java;
+
+namespace ignite
+{    
+    namespace impl
+    {
+        namespace interop 
+        {
+            int8_t* InteropMemory::Data(int8_t* memPtr)
+            {
+                return reinterpret_cast<int8_t*>(*reinterpret_cast<int64_t*>(memPtr));
+            }
+
+            void InteropMemory::Data(int8_t* memPtr, void* ptr)
+            {
+                *reinterpret_cast<int64_t*>(memPtr) = reinterpret_cast<int64_t>(ptr);
+            }
+
+            int32_t InteropMemory::Capacity(int8_t* memPtr)
+            {
+                return *reinterpret_cast<int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_CAP);
+            }
+
+            void InteropMemory::Capacity(int8_t* memPtr, int32_t val)
+            {
+                *reinterpret_cast<int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_CAP) = val;
+            }
+
+            int32_t InteropMemory::Length(int8_t* memPtr)
+            {
+                return *reinterpret_cast<int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_LEN);
+            }
+
+            void InteropMemory::Length(int8_t* memPtr, int32_t val)
+            {
+                *reinterpret_cast<int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_LEN) = val;
+            }
+
+            int32_t InteropMemory::Flags(int8_t* memPtr)
+            {
+                return *reinterpret_cast<int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_FLAGS);
+            }
+
+            void InteropMemory::Flags(int8_t* memPtr, int32_t val)
+            {
+                *reinterpret_cast<int32_t*>(memPtr + IGNITE_MEM_HDR_OFF_FLAGS) = val;
+            }
+
+            bool InteropMemory::IsExternal(int8_t* memPtr)
+            {
+                return IsExternal(Flags(memPtr));
+            }
+
+            bool InteropMemory::IsExternal(int32_t flags)
+            {
+                return (flags & IGNITE_MEM_FLAG_EXT) != IGNITE_MEM_FLAG_EXT;
+            }
+
+            bool InteropMemory::IsPooled(int8_t* memPtr)
+            {
+                return IsPooled(Flags(memPtr));
+            }
+
+            bool InteropMemory::IsPooled(int32_t flags)
+            {
+                return (flags & IGNITE_MEM_FLAG_POOLED) != 0;
+            }
+
+            bool InteropMemory::IsAcquired(int8_t* memPtr)
+            {
+                return IsAcquired(Flags(memPtr));
+            }
+
+            bool InteropMemory::IsAcquired(int32_t flags)
+            {
+                return (flags & IGNITE_MEM_FLAG_ACQUIRED) != 0;
+            }
+                
+            int8_t* InteropMemory::Pointer()
+            {
+                return memPtr;
+            }
+
+            int64_t InteropMemory::PointerLong()
+            {
+                return reinterpret_cast<int64_t>(memPtr);
+            }
+
+            int8_t* InteropMemory::Data()
+            {
+                return Data(memPtr);
+            }
+
+            int32_t InteropMemory::Capacity()
+            {
+                return Capacity(memPtr);
+            }
+
+            int32_t InteropMemory::Length()
+            {
+                return Length(memPtr);
+            }
+
+            void InteropMemory::Length(int32_t val)
+            {
+                Length(memPtr, val);
+            }
+                
+            InteropUnpooledMemory::InteropUnpooledMemory(int32_t cap)
+            {
+                memPtr = static_cast<int8_t*>(malloc(IGNITE_MEM_HDR_LEN));
+                
+                Data(memPtr, malloc(cap));
+                Capacity(memPtr, cap);
+                Length(memPtr, 0);
+                Flags(memPtr, IGNITE_MEM_FLAG_EXT);
+
+                owning = true;
+            }
+
+            InteropUnpooledMemory::InteropUnpooledMemory(int8_t* memPtr)
+            {
+                this->memPtr = memPtr;
+                this->owning = false;
+            }
+
+            InteropUnpooledMemory::~InteropUnpooledMemory()
+            {
+                if (owning) {
+                    free(Data());
+                    free(memPtr);
+                }
+            }
+
+            void InteropUnpooledMemory::Reallocate(int32_t cap)
+            {
+                int doubledCap = Capacity() << 1;
+
+                if (doubledCap > cap)
+                    cap = doubledCap;
+
+                Data(memPtr, realloc(Data(memPtr), cap));
+                Capacity(memPtr, cap);
+            }
+
+            InteropExternalMemory::InteropExternalMemory(int8_t* memPtr) 
+            {
+                this->memPtr = memPtr;
+            }
+
+            void InteropExternalMemory::Reallocate(int32_t cap)
+            {
+                if (JniContext::Reallocate(reinterpret_cast<int64_t>(memPtr), cap) == -1) {
+                    IGNITE_ERROR_FORMATTED_2(IgniteError::IGNITE_ERR_MEMORY, "Failed to reallocate external memory", 
+                        "memPtr", PointerLong(), "requestedCapacity", cap)
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp b/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp
new file mode 100644
index 0000000..ecdfd42
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/interop/interop_output_stream.cpp
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cstring>
+
+#include "ignite/impl/interop/interop_output_stream.h"
+#include "ignite/ignite_error.h"
+
+/**
+ * Common macro to write a single value.
+ */
+#define IGNITE_INTEROP_OUT_WRITE(val, type, len) { \
+    EnsureCapacity(pos + len); \
+    *reinterpret_cast<type*>(data + pos) = val; \
+    Shift(len); \
+}
+
+/**
+ * Common macro to write an array.
+ */
+#define IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len) { \
+    CopyAndShift(reinterpret_cast<const int8_t*>(val), 0, len); \
+}
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace interop 
+        {
+            union PortableFloatInt32
+            {
+                float f;
+                int32_t i;                
+            };
+
+            union PortableDoubleInt64
+            {
+                double d;
+                int64_t i;                
+            };
+
+            InteropOutputStream::InteropOutputStream(InteropMemory* mem)
+            {
+                this->mem = mem;
+
+                data = mem->Data();
+                cap = mem->Capacity();
+                pos = 0;
+            }
+
+            void InteropOutputStream::WriteInt8(const int8_t val)
+            {
+                IGNITE_INTEROP_OUT_WRITE(val, int8_t, 1);
+            }
+
+            void InteropOutputStream::WriteInt8(const int8_t val, const int32_t pos)
+            {
+                EnsureCapacity(pos + 1);
+
+                *(data + pos) = val;
+            }
+
+            void InteropOutputStream::WriteInt8Array(const int8_t* val, const int32_t len)
+            {
+                IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len);
+            }
+
+            void InteropOutputStream::WriteBool(const bool val)
+            {
+                WriteInt8(val ? 1 : 0);
+            }
+
+            void InteropOutputStream::WriteBoolArray(const bool* val, const int32_t len)
+            {
+                for (int i = 0; i < len; i++)
+                    WriteBool(*(val + i));
+            }
+
+            void InteropOutputStream::WriteInt16(const int16_t val)
+            {
+                IGNITE_INTEROP_OUT_WRITE(val, int16_t, 2);
+            }
+
+            void InteropOutputStream::WriteInt16Array(const int16_t* val, const int32_t len)
+            {
+                IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len << 1);
+            }
+
+            void InteropOutputStream::WriteUInt16(const uint16_t val)
+            {
+                IGNITE_INTEROP_OUT_WRITE(val, uint16_t, 2);
+            }
+
+            void InteropOutputStream::WriteUInt16Array(const uint16_t* val, const int32_t len)
+            {
+                IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len << 1);
+            }
+
+            void InteropOutputStream::WriteInt32(const int32_t val)
+            {
+                IGNITE_INTEROP_OUT_WRITE(val, int32_t, 4);
+            }
+
+            void InteropOutputStream::WriteInt32(const int32_t pos, const int32_t val)
+            {
+                EnsureCapacity(pos + 4);
+
+                *reinterpret_cast<int32_t*>(data + pos) = val;
+            }
+
+            void InteropOutputStream::WriteInt32Array(const int32_t* val, const int32_t len)
+            {
+                IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len << 2);
+            }
+
+            void InteropOutputStream::WriteInt64(const int64_t val)
+            {
+                IGNITE_INTEROP_OUT_WRITE(val, int64_t, 8);
+            }
+
+            void InteropOutputStream::WriteInt64Array(const int64_t* val, const int32_t len)
+            {
+                IGNITE_INTEROP_OUT_WRITE_ARRAY(val, len << 3);
+            }
+
+            void InteropOutputStream::WriteFloat(const float val)
+            {
+                PortableFloatInt32 u;
+
+                u.f = val;
+
+                WriteInt32(u.i);
+            }
+
+            void InteropOutputStream::WriteFloatArray(const float* val, const int32_t len)
+            {
+                for (int i = 0; i < len; i++)
+                    WriteFloat(*(val + i));
+            }
+
+            void InteropOutputStream::WriteDouble(const double val)
+            {
+                PortableDoubleInt64 u;
+
+                u.d = val;
+
+                WriteInt64(u.i);
+            }
+
+            void InteropOutputStream::WriteDoubleArray(const double* val, const int32_t len)
+            {
+                for (int i = 0; i < len; i++)
+                    WriteDouble(*(val + i));
+            }
+
+            int32_t InteropOutputStream::Position()
+            {
+                return pos;
+            }
+
+            void InteropOutputStream::Position(const int32_t val)
+            {
+                EnsureCapacity(val);
+
+                pos = val;
+            }
+
+            void InteropOutputStream::Synchronize()
+            {
+                mem->Length(pos);
+            }
+
+            void InteropOutputStream::EnsureCapacity(int32_t reqCap) {
+                if (reqCap > cap) {
+                    int newCap = cap << 1;
+
+                    if (newCap < reqCap)
+                        newCap = reqCap;
+
+                    mem->Reallocate(newCap);
+                    data = mem->Data();
+                    cap = newCap;
+                }
+            }
+
+            void InteropOutputStream::Shift(int32_t cnt) {
+                pos += cnt;
+            }
+
+            void InteropOutputStream::CopyAndShift(const int8_t* src, int32_t off, int32_t len) {
+                EnsureCapacity(pos + len);
+
+                memcpy(data + pos, src + off, len);
+
+                Shift(len);
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp
new file mode 100644
index 0000000..5ca91dc
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_handler.cpp
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/portable/portable_metadata_handler.h"
+
+using namespace ignite::common::concurrent;
+
+namespace ignite
+{    
+    namespace impl
+    {
+        namespace portable
+        {
+            PortableMetadataHandler::PortableMetadataHandler(SPSnap snap) : snap(snap), fieldIds(NULL), fields(NULL)
+            {
+                // No-op.
+            }
+            
+            PortableMetadataHandler::~PortableMetadataHandler()
+            {
+                if (fieldIds)
+                    delete fieldIds;
+
+                if (fields)
+                    delete fields;
+            }
+
+            void PortableMetadataHandler::OnFieldWritten(int32_t fieldId, std::string fieldName, int32_t fieldTypeId)
+            {
+                if (!snap.Get() || !snap.Get()->ContainsFieldId(fieldId))
+                {
+                    if (!HasDifference())
+                    {
+                        fieldIds = new std::set<int32_t>();
+                        fields = new std::map<std::string, int32_t>();
+                    }
+
+                    fieldIds->insert(fieldId);
+                    (*fields)[fieldName] = fieldTypeId;
+                }
+            }
+
+            SPSnap PortableMetadataHandler::GetSnapshot()
+            {
+                return snap;
+            }
+
+            bool PortableMetadataHandler::HasDifference()
+            {
+                return fieldIds ? true : false;
+            }
+
+            std::set<int32_t>* PortableMetadataHandler::GetFieldIds()
+            {
+                return fieldIds;
+            }
+
+            std::map<std::string, int32_t>* PortableMetadataHandler::GetFields()
+            {
+                return fields;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp
new file mode 100644
index 0000000..63e92a9
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_manager.cpp
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/common/concurrent.h>
+
+#include "ignite/impl/portable/portable_metadata_manager.h"
+
+using namespace ignite::common::concurrent;
+
+namespace ignite
+{    
+    namespace impl
+    {
+        namespace portable
+        {
+            PortableMetadataManager::PortableMetadataManager() : 
+                snapshots(SharedPointer<std::map<int32_t, SPSnap>>(new std::map<int32_t, SPSnap>)),
+                pending(new std::vector<SPSnap>()), 
+                cs(new CriticalSection()), 
+                pendingVer(0), ver(0)
+            {
+                // No-op.
+            }
+
+            PortableMetadataManager::~PortableMetadataManager()
+            {
+                pending->erase(pending->begin(), pending->end());
+
+                delete pending;
+                delete cs;
+            }
+
+            SharedPointer<PortableMetadataHandler> PortableMetadataManager::GetHandler(int32_t typeId)
+            {
+                SharedPointer<std::map<int32_t, SPSnap>> snapshots0 = snapshots;
+
+                SPSnap snapshot = (*snapshots0.Get())[typeId];
+
+                return SharedPointer<PortableMetadataHandler>(new PortableMetadataHandler(snapshot));
+            }
+
+            void PortableMetadataManager::SubmitHandler(std::string typeName, int32_t typeId, 
+                PortableMetadataHandler* hnd)
+            {
+                Snap* snap = hnd->GetSnapshot().Get();
+
+                // If this is the very first write of a class or difference exists, 
+                // we need to enqueue it for write.
+                if (!snap || hnd->HasDifference())
+                {
+                    std::set<int32_t>* newFieldIds = new std::set<int32_t>();
+                    std::map<std::string, int32_t>* newFields = new std::map<std::string, int32_t>();
+                    
+                    CopyFields(snap, newFieldIds, newFields);
+
+                    if (hnd->HasDifference())
+                    {
+                        std::set<int32_t>* diffFieldIds = hnd->GetFieldIds();
+                        std::map<std::string, int32_t>* diffFields = hnd->GetFields();
+
+                        for (std::set<int32_t>::iterator it = diffFieldIds->begin(); it != diffFieldIds->end(); ++it)
+                            newFieldIds->insert(*it);
+
+                        for (std::map<std::string, int32_t>::iterator it = diffFields->begin(); it != diffFields->end(); ++it)
+                            (*newFields)[it->first] = it->second;
+                    }
+
+                    Snap* diffSnap = new Snap(typeName, typeId, newFieldIds, newFields);
+
+                    cs->Enter();
+
+                    pending->push_back(SPSnap(diffSnap));
+
+                    pendingVer++;
+
+                    cs->Leave();
+                }
+            }
+
+            int32_t PortableMetadataManager::GetVersion()
+            {
+                Memory::Fence();
+
+                return ver;
+            }
+
+            bool PortableMetadataManager::IsUpdatedSince(int32_t oldVer)
+            {
+                Memory::Fence();
+
+                return pendingVer > oldVer;
+            }
+
+            bool PortableMetadataManager::ProcessPendingUpdates(PortableMetadataUpdater* updater, IgniteError* err)
+            {
+                bool success = true; // Optimistically assume that all will be fine.
+                
+                cs->Enter();
+
+                for (std::vector<SPSnap>::iterator it = pending->begin(); it != pending->end(); ++it)
+                {
+                    Snap* pendingSnap = (*it).Get();
+
+                    if (updater->Update(pendingSnap, err))
+                    {
+                        // Perform copy-on-write update of snapshot collection.
+                        std::map<int32_t, SPSnap>* newSnapshots = new std::map<int32_t, SPSnap>();
+                        
+                        bool snapshotFound = false;
+
+                        for (std::map<int32_t, SPSnap>::iterator snapIt = snapshots.Get()->begin();
+                            snapIt != snapshots.Get()->end(); ++snapIt)
+                        {
+                            int32_t curTypeId = snapIt->first;
+                            Snap* curSnap = snapIt->second.Get();
+
+                            if (pendingSnap->GetTypeId() == curTypeId)
+                            {
+                                // Have to create snapshot with updated fields.
+                                std::set<int32_t>* newFieldIds = new std::set<int32_t>();
+                                std::map<std::string, int32_t>* newFields = new std::map<std::string, int32_t>();
+
+                                // Add old fields.
+                                CopyFields(curSnap, newFieldIds, newFields);
+
+                                // Add new fields.
+                                CopyFields(pendingSnap, newFieldIds, newFields);
+                                
+                                // Create new snapshot.
+                                Snap* newSnap = new Snap(pendingSnap->GetTypeName(), pendingSnap->GetTypeId(), 
+                                    newFieldIds, newFields);
+
+                                (*newSnapshots)[curTypeId] = SPSnap(newSnap);
+
+                                snapshotFound = true;
+                            }
+                            else 
+                                (*newSnapshots)[curTypeId] = snapIt->second; // Just transfer exising snapshot.
+                        }
+
+                        // Handle situation when completely new snapshot is found.
+                        if (!snapshotFound)
+                            (*newSnapshots)[pendingSnap->GetTypeId()] = *it;
+
+                        snapshots = SharedPointer<std::map<int32_t, SPSnap>>(newSnapshots);
+                    }
+                    else
+                    {
+                        // Stop as we cannot move further.
+                        success = false;
+
+                        break;
+                    }
+                }
+
+                if (success) 
+                {
+                    pending->erase(pending->begin(), pending->end());
+
+                    ver = pendingVer;
+                }
+
+                cs->Leave();
+
+                return success;
+            }
+
+            void PortableMetadataManager::CopyFields(Snap* snap, std::set<int32_t>* fieldIds, 
+                std::map<std::string, int32_t>* fields)
+            {
+                if (snap && snap->HasFields())
+                {
+                    std::set<int32_t>* snapFieldIds = snap->GetFieldIds();
+                    std::map<std::string, int32_t>* snapFields = snap->GetFields();
+
+                    for (std::set<int32_t>::iterator oldIt = snapFieldIds->begin();
+                        oldIt != snapFieldIds->end(); ++oldIt)
+                        fieldIds->insert(*oldIt);
+
+                    for (std::map<std::string, int32_t>::iterator newFieldsIt = snapFields->begin();
+                        newFieldsIt != snapFields->end(); ++newFieldsIt)
+                        (*fields)[newFieldsIt->first] = newFieldsIt->second;
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp
new file mode 100644
index 0000000..6ce5ab5
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_snapshot.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/portable/portable_metadata_snapshot.h"
+
+namespace ignite
+{    
+    namespace impl
+    {
+        namespace portable
+        {
+            PortableMetadataSnapshot::PortableMetadataSnapshot(std::string typeName, int32_t typeId, 
+                std::set<int32_t>* fieldIds, std::map<std::string, int32_t>* fields) : 
+                typeName(typeName), typeId(typeId), fieldIds(fieldIds), fields(fields)
+            {
+                // No-op.
+            }
+
+            PortableMetadataSnapshot::~PortableMetadataSnapshot()
+            {
+                delete fieldIds;
+                delete fields;
+            }
+
+            bool PortableMetadataSnapshot::ContainsFieldId(int32_t fieldId)
+            {
+                return fieldIds && fieldIds->count(fieldId) == 1;
+            }
+
+            std::string PortableMetadataSnapshot::GetTypeName()
+            {
+                return typeName;
+            }
+
+            int32_t PortableMetadataSnapshot::GetTypeId()
+            {
+                return typeId;
+            }
+
+            bool PortableMetadataSnapshot::HasFields()
+            {
+                return !fieldIds->empty();
+            }
+
+            std::set<int32_t>* PortableMetadataSnapshot::GetFieldIds()
+            {
+                return fieldIds;
+            }
+
+            std::map<std::string, int32_t>* PortableMetadataSnapshot::GetFields()
+            {
+                return fields;
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp
new file mode 100644
index 0000000..81c96d7
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater.cpp
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/portable/portable_metadata_updater.h"
+
+namespace ignite
+{    
+    namespace impl
+    {
+        namespace portable
+        {
+            PortableMetadataUpdater::~PortableMetadataUpdater()
+            {
+                // No-op.
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/1fc06e01/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp
new file mode 100644
index 0000000..07a1758
--- /dev/null
+++ b/modules/platform/src/main/cpp/core/src/impl/portable/portable_metadata_updater_impl.cpp
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ignite/impl/portable/portable_metadata_updater_impl.h"
+#include "ignite/impl/interop/interop_output_stream.h"
+#include "ignite/impl/portable/portable_writer_impl.h"
+#include "ignite/portable/portable_raw_writer.h"
+
+using namespace ignite::common::concurrent;
+using namespace ignite::common::java;
+using namespace ignite::impl;
+using namespace ignite::impl::interop;
+using namespace ignite::portable;
+
+namespace ignite
+{    
+    namespace impl
+    {
+        namespace portable
+        {
+            /** Operation: Clear. */
+            const int32_t OP_METADATA = -1;
+
+            PortableMetadataUpdaterImpl::PortableMetadataUpdaterImpl(SharedPointer<IgniteEnvironment> env,
+                jobject javaRef) :  env(env), javaRef(javaRef)
+            {
+                // No-op.
+            }
+
+            PortableMetadataUpdaterImpl::~PortableMetadataUpdaterImpl()
+            {
+                // No-op.
+            }
+
+            bool PortableMetadataUpdaterImpl::Update(Snap* snap, IgniteError* err)
+            {
+                JniErrorInfo jniErr;
+
+                SharedPointer<InteropMemory> mem = env.Get()->AllocateMemory();
+
+                InteropOutputStream out(mem.Get());
+                PortableWriterImpl writer(&out, NULL);
+                PortableRawWriter rawWriter(&writer);
+
+                // We always pass only one meta at a time in current implementation for simplicity.
+                rawWriter.WriteInt32(1);
+
+                rawWriter.WriteInt32(snap->GetTypeId());
+                rawWriter.WriteString(snap->GetTypeName());
+                rawWriter.WriteString(NULL); // Affinity key is not supported for now.
+                
+                if (snap->HasFields())
+                {
+                    std::map<std::string, int32_t>* fields = snap->GetFields();
+
+                    rawWriter.WriteInt32(static_cast<int32_t>(fields->size()));
+
+                    for (std::map<std::string, int32_t>::iterator it = fields->begin(); it != fields->end(); ++it)
+                    {
+                        rawWriter.WriteString(it->first);
+                        rawWriter.WriteInt32(it->second);
+                    }
+                }
+                else
+                    rawWriter.WriteInt32(0);
+
+                out.Synchronize();
+
+                long long res = env.Get()->Context()->TargetInStreamOutLong(javaRef, OP_METADATA, mem.Get()->PointerLong(), &jniErr);
+
+                IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+                if (jniErr.code == IGNITE_JNI_ERR_SUCCESS)
+                    return res == 1;
+                else
+                    return false;
+            }
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message