ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From isap...@apache.org
Subject [1/2] ignite git commit: IGNITE-3355: Implemented Compute::Call() for C++
Date Mon, 29 May 2017 14:30:47 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 0f8af137c -> f9c96de57


http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
new file mode 100644
index 0000000..e218e36
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::compute::ComputeJobHolder class template.
+ */
+
+#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_HOLDER
+#define _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_HOLDER
+
+#include <ignite/impl/binary/binary_writer_impl.h>
+#include <ignite/impl/compute/compute_job_result.h>
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace compute
+        {
+            /**
+             * Compute job holder. Internal helper class.
+             * Used to handle jobs in general way, without specific types.
+             */
+            class ComputeJobHolder
+            {
+            public:
+                /**
+                 * Destructor.
+                 */
+                virtual ~ComputeJobHolder()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Execute job locally.
+                 */
+                virtual void ExecuteLocal() = 0;
+
+                /**
+                 * Execute job remote.
+                 *
+                 * @param writer Writer.
+                 */
+                virtual void ExecuteRemote(binary::BinaryWriterImpl& writer) = 0;
+            };
+
+            /**
+             * Compute job holder. Internal class.
+             *
+             * @tparam F Actual job type.
+             * @tparam R Job return type.
+             */
+            template<typename F, typename R>
+            class ComputeJobHolderImpl : public ComputeJobHolder
+            {
+            public:
+                typedef R ResultType;
+                typedef F JobType;
+
+                /**
+                 * Constructor.
+                 *
+                 * @param job Job.
+                 */
+                ComputeJobHolderImpl(JobType job) :
+                    job(job)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ComputeJobHolderImpl()
+                {
+                    // No-op.
+                }
+
+                const ComputeJobResult<ResultType>& GetResult()
+                {
+                    return res;
+                }
+
+                virtual void ExecuteLocal()
+                {
+                    try
+                    {
+                        res.SetResult(job.Call());
+                    }
+                    catch (const IgniteError& err)
+                    {
+                        res.SetError(err);
+                    }
+                    catch (const std::exception& err)
+                    {
+                        res.SetError(IgniteError(IgniteError::IGNITE_ERR_STD, err.what()));
+                    }
+                    catch (...)
+                    {
+                        res.SetError(IgniteError(IgniteError::IGNITE_ERR_UNKNOWN,
+                            "Unknown error occurred during call."));
+                    }
+                }
+
+                virtual void ExecuteRemote(binary::BinaryWriterImpl& writer)
+                {
+                    ExecuteLocal();
+
+                    res.Write(writer);
+                }
+
+            private:
+                /** Result. */
+                ComputeJobResult<ResultType> res;
+
+                /** Job. */
+                JobType job;
+            };
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_JOB_HOLDER

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
new file mode 100644
index 0000000..5bcb762
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::compute::ComputeJobResult class template.
+ */
+
+#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_RESULT
+#define _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_RESULT
+
+#include <memory>
+#include <sstream>
+
+#include <ignite/common/promise.h>
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace compute
+        {
+            /**
+             * Used to hold compute job result.
+             */
+            template<typename R>
+            class ComputeJobResult
+            {
+            public:
+                typedef R ResultType;
+                /**
+                 * Default constructor.
+                 */
+                ComputeJobResult() :
+                    res(),
+                    err()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Set result value.
+                 *
+                 * @param val Value to set as a result.
+                 */
+                void SetResult(const ResultType& val)
+                {
+                    res = val;
+                }
+
+                /**
+                 * Set error.
+                 *
+                 * @param error Error to set.
+                 */
+                void SetError(const IgniteError error)
+                {
+                    err = error;
+                }
+
+                /**
+                 * Set promise to a state which corresponds to result.
+                 *
+                 * @param promise Promise, which state to set.
+                 */
+                void SetPromise(common::Promise<ResultType>& promise)
+                {
+                    if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+                        promise.SetError(err);
+                    else
+                        promise.SetValue(std::auto_ptr<ResultType>(new ResultType(res)));
+                }
+
+                /**
+                 * Write using writer.
+                 *
+                 * @param writer Writer.
+                 */
+                void Write(binary::BinaryWriterImpl& writer)
+                {
+                    if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+                    {
+                        // Fail
+                        writer.WriteBool(false);
+
+                        // Native Exception
+                        writer.WriteBool(true);
+
+                        writer.WriteObject<IgniteError>(err);
+                    }
+                    else
+                    {
+                        // Success
+                        writer.WriteBool(true);
+
+                        writer.WriteObject<ResultType>(res);
+                    }
+                }
+
+                /**
+                 * Read using reader.
+                 *
+                 * @param reader Reader.
+                 */
+                void Read(binary::BinaryReaderImpl& reader)
+                {
+                    bool success = reader.ReadBool();
+
+                    if (success)
+                    {
+                        res = reader.ReadObject<ResultType>();
+
+                        err = IgniteError();
+                    }
+                    else
+                    {
+                        bool native = reader.ReadBool();
+
+                        if (native)
+                            err = reader.ReadObject<IgniteError>();
+                        else
+                        {
+                            std::stringstream buf;
+
+                            buf << reader.ReadObject<std::string>() << " : ";
+                            buf << reader.ReadObject<std::string>() << ", ";
+                            buf << reader.ReadObject<std::string>();
+
+                            std::string msg = buf.str();
+
+                            err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, msg.c_str());
+                        }
+                    }
+                }
+
+            private:
+                /** Result. */
+                ResultType res;
+
+                /** Erorr. */
+                IgniteError err;
+            };
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_JOB_RESULT

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
new file mode 100644
index 0000000..bdd7513
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::compute::ComputeTaskHolder class and
+ * ignite::impl::compute::ComputeTaskHolderImpl class template.
+ */
+
+#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
+#define _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
+
+#include <stdint.h>
+
+#include <ignite/common/promise.h>
+#include <ignite/impl/compute/compute_job_result.h>
+#include <ignite/impl/compute/compute_job_holder.h>
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace compute
+        {
+            struct ComputeJobResultPolicy
+            {
+                enum Type
+                {
+                    /**
+                     * Wait for results if any are still expected. If all results have been received -
+                     * it will start reducing results.
+                     */
+                    WAIT = 0,
+
+                    /**
+                     * Ignore all not yet received results and start reducing results.
+                     */
+                    REDUCE = 1,
+
+                    /**
+                     * Fail-over job to execute on another node.
+                     */
+                    FAILOVER = 2
+                };
+            };
+
+            /**
+             * Compute task holder. Internal helper class.
+             * Used to handle tasks in general way, without specific types.
+             */
+            class ComputeTaskHolder
+            {
+            public:
+                /**
+                 * Constructor.
+                 *
+                 * @param handle Job handle.
+                 */
+                ComputeTaskHolder(int64_t handle) :
+                    handle(handle)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ComputeTaskHolder()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Process local job result.
+                 *
+                 * @param job Job.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultLocal(ComputeJobHolder& job) = 0;
+
+                /**
+                 * Process remote job result.
+                 *
+                 * @param job Job.
+                 * @param reader Reader for stream with result.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) = 0;
+
+                /**
+                 * Reduce results of related jobs.
+                 */
+                virtual void Reduce() = 0;
+
+                /**
+                 * Get related job handle.
+                 *
+                 * @return Job handle.
+                 */
+                int64_t GetJobHandle()
+                {
+                    return handle;
+                }
+
+            private:
+                /** Related job handle. */
+                int64_t handle;
+            };
+
+            /**
+             * Compute task holder type-specific implementation.
+             */
+            template<typename F, typename R>
+            class ComputeTaskHolderImpl : public ComputeTaskHolder
+            {
+            public:
+                typedef F JobType;
+                typedef R ResultType;
+
+                /**
+                 * Constructor.
+                 *
+                 * @param handle Job handle.
+                 */
+                ComputeTaskHolderImpl(int64_t handle) :
+                    ComputeTaskHolder(handle)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~ComputeTaskHolderImpl()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Process local job result.
+                 *
+                 * @param job Job.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultLocal(ComputeJobHolder& job)
+                {
+                    typedef ComputeJobHolderImpl<JobType, ResultType> ActualComputeJobHolder;
+
+                    ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job);
+
+                    res = job0.GetResult();
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Process remote job result.
+                 *
+                 * @param job Job.
+                 * @param reader Reader for stream with result.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader)
+                {
+                    res.Read(reader);
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Reduce results of related jobs.
+                 */
+                virtual void Reduce()
+                {
+                    res.SetPromise(promise);
+                }
+
+                /**
+                 * Get result promise.
+                 *
+                 * @return Reference to result promise.
+                 */
+                common::Promise<ResultType>& GetPromise()
+                {
+                    return promise;
+                }
+
+            private:
+                /** Result. */
+                ComputeJobResult<ResultType> res;
+
+                /** Task result promise. */
+                common::Promise<ResultType> promise;
+            };
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
index a99855a..d0de432 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
@@ -52,6 +52,8 @@ namespace ignite
                     CACHE_ENTRY_FILTER_CREATE = 2,
 
                     CACHE_ENTRY_FILTER_APPLY = 3,
+
+                    COMPUTE_JOB_CREATE = 4,
                 };
             };
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
index e3cb859..13f7b80 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
@@ -209,6 +209,75 @@ namespace ignite
              */
             common::concurrent::SharedPointer<IgniteBindingImpl> GetBinding() const;
 
+            /**
+             * Get processor compute.
+             *
+             * @param proj Projection.
+             * @return Processor compute.
+             */
+            jobject GetProcessorCompute(jobject proj);
+
+            /**
+             * Locally execute compute job.
+             *
+             * @param jobHandle Job handle.
+             */
+            void ComputeJobExecuteLocal(int64_t jobHandle);
+
+            /**
+             * Locally commit job execution result for the task.
+             *
+             * @param taskHandle Task handle.
+             * @param jobHandle Job handle.
+             * @return Reduce politics.
+             */
+            int32_t ComputeTaskLocalJobResult(int64_t taskHandle, int64_t jobHandle);
+
+            /**
+             * Reduce compute task.
+             *
+             * @param taskHandle Task handle.
+             */
+            void ComputeTaskReduce(int64_t taskHandle);
+
+            /**
+             * Complete compute task.
+             *
+             * @param taskHandle Task handle.
+             */
+            void ComputeTaskComplete(int64_t taskHandle);
+
+            /**
+             * Create compute job.
+             *
+             * @param mem Memory.
+             * @return Job handle.
+             */
+            int64_t ComputeJobCreate(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
+            /**
+             * Execute compute job.
+             *
+             * @param mem Memory.
+             * @return Job handle.
+             */
+            void ComputeJobExecute(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
+            /**
+             * Destroy compute job.
+             *
+             * @param jobHandle Job handle to destroy.
+             */
+            void ComputeJobDestroy(int64_t jobHandle);
+
+            /**
+             * Consume result of remote job execution.
+             *
+             * @param mem Memory containing result.
+             * @return Reduce policy.
+             */
+            int32_t ComputeTaskJobResult(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
         private:
             /** Node configuration. */
             IgniteConfiguration* cfg;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
index 5b1f527..baddec4 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
@@ -22,10 +22,11 @@
 #include <ignite/jni/java.h>
 #include <ignite/common/utils.h>
 
+#include <ignite/impl/ignite_environment.h>
 #include <ignite/impl/cache/cache_impl.h>
 #include <ignite/impl/transactions/transactions_impl.h>
 #include <ignite/impl/cluster/cluster_group_impl.h>
-#include <ignite/impl/ignite_environment.h>
+#include <ignite/impl/compute/compute_impl.h>
 
 namespace ignite 
 {
@@ -38,7 +39,8 @@ namespace ignite
         {
             typedef common::concurrent::SharedPointer<IgniteEnvironment> SP_IgniteEnvironment;
             typedef common::concurrent::SharedPointer<transactions::TransactionsImpl> SP_TransactionsImpl;
-            typedef common::concurrent::SharedPointer<cluster::ClusterGroupImpl> SP_ClusterGroupImpl;
+            typedef common::concurrent::SharedPointer<compute::ComputeImpl> SP_ComputeImpl;
+            typedef common::concurrent::SharedPointer<IgniteBindingImpl> SP_IgniteBindingImpl;
         public:
             /**
              * Constructor used to create new instance.
@@ -154,7 +156,7 @@ namespace ignite
              *
              * @return IgniteBinding class instance.
              */
-            common::concurrent::SharedPointer<IgniteBindingImpl> GetBinding();
+            SP_IgniteBindingImpl GetBinding();
 
             /**
              * Get instance of the implementation from the proxy class.
@@ -185,7 +187,7 @@ namespace ignite
              *
              * @return TransactionsImpl instance.
              */
-            SP_TransactionsImpl GetTransactions() const
+            SP_TransactionsImpl GetTransactions()
             {
                 return txImpl;
             }
@@ -195,11 +197,18 @@ namespace ignite
              *
              * @return ClusterGroupImpl instance.
              */
-            SP_ClusterGroupImpl GetProjection() const
+            cluster::SP_ClusterGroupImpl GetProjection()
             {
                 return prjImpl;
             }
 
+            /**
+             * Get compute.
+             *
+             * @return ComputeImpl instance.
+             */
+            SP_ComputeImpl GetCompute();
+
         private:
             /**
              * Get transactions internal call.
@@ -213,7 +222,7 @@ namespace ignite
              *
              * @return ClusterGroupImpl instance.
              */
-            SP_ClusterGroupImpl InternalGetProjection(IgniteError &err);
+            cluster::SP_ClusterGroupImpl InternalGetProjection(IgniteError &err);
 
             /** Environment. */
             SP_IgniteEnvironment env;
@@ -225,7 +234,7 @@ namespace ignite
             SP_TransactionsImpl txImpl;
 
             /** Projection implementation. */
-            SP_ClusterGroupImpl prjImpl;
+            cluster::SP_ClusterGroupImpl prjImpl;
 
             IGNITE_NO_COPY_ASSIGNMENT(IgniteImpl)
         };

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
index f9b2b7f..0384dcc 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h
@@ -62,7 +62,7 @@ namespace ignite
                 /**
                  * Destructor.
                  */
-                ~InteropTarget();
+                virtual ~InteropTarget();
 
                 /**
                  * Internal out operation.
@@ -135,6 +135,15 @@ namespace ignite
                 OperationResult::Type InStreamOutLong(int32_t opType, InteropMemory& outInMem, IgniteError& err);
 
                 /**
+                 * In stream out object operation.
+                 *
+                 * @param opType Type of operation.
+                 * @param outInMem Input and output memory.
+                 * @return Java object references.
+                 */
+                jobject InStreamOutObject(int32_t opType, InteropMemory& outInMem);
+
+                /**
                 * Internal out-in operation.
                 *
                 * @param opType Operation type.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/project/vs/core.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj
index 5cd49f3..9911ffe 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj
@@ -208,6 +208,8 @@
     <ClInclude Include="..\..\include\ignite\cache\query\query_sql.h" />
     <ClInclude Include="..\..\include\ignite\cache\query\query_sql_fields.h" />
     <ClInclude Include="..\..\include\ignite\cache\query\query_text.h" />
+    <ClInclude Include="..\..\include\ignite\compute\compute.h" />
+    <ClInclude Include="..\..\include\ignite\compute\compute_func.h" />
     <ClInclude Include="..\..\include\ignite\ignite_binding_context.h" />
     <ClInclude Include="..\..\include\ignite\ignite.h" />
     <ClInclude Include="..\..\include\ignite\ignite_configuration.h" />
@@ -225,6 +227,11 @@
     <ClInclude Include="..\..\include\ignite\impl\cache\query\query_fields_row_impl.h" />
     <ClInclude Include="..\..\include\ignite\impl\cache\query\query_impl.h" />
     <ClInclude Include="..\..\include\ignite\impl\cluster\cluster_group_impl.h" />
+    <ClInclude Include="..\..\include\ignite\impl\compute\cancelable_impl.h" />
+    <ClInclude Include="..\..\include\ignite\impl\compute\compute_impl.h" />
+    <ClInclude Include="..\..\include\ignite\impl\compute\compute_job_holder.h" />
+    <ClInclude Include="..\..\include\ignite\impl\compute\compute_job_result.h" />
+    <ClInclude Include="..\..\include\ignite\impl\compute\compute_task_holder.h" />
     <ClInclude Include="..\..\include\ignite\impl\helpers.h" />
     <ClInclude Include="..\..\include\ignite\impl\ignite_environment.h" />
     <ClInclude Include="..\..\include\ignite\impl\ignite_impl.h" />
@@ -251,6 +258,8 @@
     <ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp" />
     <ClCompile Include="..\..\src\impl\cache\query\query_impl.cpp" />
     <ClCompile Include="..\..\src\impl\cluster\cluster_group_impl.cpp" />
+    <ClCompile Include="..\..\src\impl\compute\cancelable_impl.cpp" />
+    <ClCompile Include="..\..\src\impl\compute\compute_impl.cpp" />
     <ClCompile Include="..\..\src\impl\ignite_binding_impl.cpp" />
     <ClCompile Include="..\..\src\impl\ignite_environment.cpp" />
     <ClCompile Include="..\..\src\impl\ignite_impl.cpp" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
index 98099a9..7b84494 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
@@ -55,6 +55,12 @@
     <ClCompile Include="..\..\src\impl\ignite_binding_impl.cpp">
       <Filter>Code\impl</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\impl\compute\compute_impl.cpp">
+      <Filter>Code\impl\compute</Filter>
+    </ClCompile>
+    <ClCompile Include="..\..\src\impl\compute\cancelable_impl.cpp">
+      <Filter>Code\impl\compute</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h">
@@ -210,6 +216,27 @@
     <ClInclude Include="..\..\include\ignite\impl\helpers.h">
       <Filter>Code\impl</Filter>
     </ClInclude>
+    <ClInclude Include="..\..\include\ignite\compute\compute.h">
+      <Filter>Code\compute</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\impl\compute\compute_impl.h">
+      <Filter>Code\impl\compute</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\compute\compute_func.h">
+      <Filter>Code\compute</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\impl\compute\cancelable_impl.h">
+      <Filter>Code\impl\compute</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\impl\compute\compute_job_holder.h">
+      <Filter>Code\impl\compute</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\impl\compute\compute_job_result.h">
+      <Filter>Code\impl\compute</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\impl\compute\compute_task_holder.h">
+      <Filter>Code\impl\compute</Filter>
+    </ClInclude>
   </ItemGroup>
   <ItemGroup>
     <Filter Include="Code">
@@ -257,5 +284,11 @@
     <Filter Include="Code\impl\cache\event">
       <UniqueIdentifier>{9c5e9732-755a-4553-8926-b4cf3b6abaf3}</UniqueIdentifier>
     </Filter>
+    <Filter Include="Code\compute">
+      <UniqueIdentifier>{f1b7ced1-0e6e-4e07-a5b6-04b076797c6f}</UniqueIdentifier>
+    </Filter>
+    <Filter Include="Code\impl\compute">
+      <UniqueIdentifier>{ef20cfe1-cd30-429d-a241-575696df8399}</UniqueIdentifier>
+    </Filter>
   </ItemGroup>
 </Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/ignite.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/ignite.cpp b/modules/platforms/cpp/core/src/ignite.cpp
index 2665916..9c42f1d 100644
--- a/modules/platforms/cpp/core/src/ignite.cpp
+++ b/modules/platforms/cpp/core/src/ignite.cpp
@@ -55,6 +55,11 @@ namespace ignite
         return transactions::Transactions(txImpl);
     }
 
+    compute::Compute Ignite::GetCompute()
+    {
+        return compute::Compute(impl.Get()->GetCompute());
+    }
+
     IgniteBinding Ignite::GetBinding()
     {
         return impl.Get()->GetBinding();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
index 1bddeac..c34e828 100644
--- a/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp
@@ -37,7 +37,7 @@ namespace ignite
             ClusterGroupImpl::ClusterGroupImpl(SP_IgniteEnvironment env, jobject javaRef) :
                 InteropTarget(env, javaRef)
             {
-                // No-op.
+                computeImpl = InternalGetCompute();
             }
 
             ClusterGroupImpl::~ClusterGroupImpl()
@@ -45,22 +45,33 @@ namespace ignite
                 // No-op.
             }
 
-            ClusterGroupImpl::SP_ClusterGroupImpl ClusterGroupImpl::ForServers(IgniteError& err)
+            SP_ClusterGroupImpl ClusterGroupImpl::ForServers()
             {
-                JniErrorInfo jniErr;
+                IgniteError err;
 
                 jobject res = InOpObject(Command::FOR_SERVERS, err);
 
-                if (jniErr.code != java::IGNITE_JNI_ERR_SUCCESS)
-                    return SP_ClusterGroupImpl();
+                IgniteError::ThrowIfNeeded(err);
 
                 return FromTarget(res);
             }
 
-            ClusterGroupImpl::SP_ClusterGroupImpl ClusterGroupImpl::FromTarget(jobject javaRef)
+            ClusterGroupImpl::SP_ComputeImpl ClusterGroupImpl::GetCompute()
+            {
+                return computeImpl;
+            }
+
+            SP_ClusterGroupImpl ClusterGroupImpl::FromTarget(jobject javaRef)
             {
                 return SP_ClusterGroupImpl(new ClusterGroupImpl(GetEnvironmentPointer(), javaRef));
             }
+
+            ClusterGroupImpl::SP_ComputeImpl ClusterGroupImpl::InternalGetCompute()
+            {
+                jobject computeProc = GetEnvironment().GetProcessorCompute(GetTarget());
+
+                return SP_ComputeImpl(new compute::ComputeImpl(GetEnvironmentPointer(), computeProc));
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp b/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp
new file mode 100644
index 0000000..6e61cc8
--- /dev/null
+++ b/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/impl/compute/cancelable_impl.h>
+
+using namespace ignite::common::concurrent;
+
+namespace
+{
+    /**
+     * Operation type.
+     */
+    struct Operation
+    {
+        enum Type
+        {
+            Cancel = 1
+        };
+    };
+}
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace compute
+        {
+            CancelableImpl::CancelableImpl(SharedPointer<IgniteEnvironment> env, jobject javaRef) :
+                InteropTarget(env, javaRef),
+                Cancelable()
+            {
+                // No-op.
+            }
+
+            void CancelableImpl::Cancel()
+            {
+                IgniteError err;
+
+                OutInOpLong(Operation::Cancel, 0, err);
+
+                IgniteError::ThrowIfNeeded(err);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp b/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp
new file mode 100644
index 0000000..591dd1f
--- /dev/null
+++ b/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <ignite/impl/compute/compute_impl.h>
+
+using namespace ignite::common::concurrent;
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace compute
+        {
+            ComputeImpl::ComputeImpl(SharedPointer<IgniteEnvironment> env, jobject javaRef) :
+                InteropTarget(env, javaRef)
+            {
+                // No-op.
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
index 2231003..4e78f09 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp
@@ -20,10 +20,10 @@
 #include <ignite/impl/binary/binary_type_updater_impl.h>
 #include <ignite/impl/module_manager.h>
 #include <ignite/impl/ignite_binding_impl.h>
+#include <ignite/impl/compute/compute_task_holder.h>
 
 #include <ignite/binary/binary.h>
 #include <ignite/cache/query/continuous/continuous_query.h>
-#include <ignite/ignite_binding.h>
 #include <ignite/ignite_binding_context.h>
 
 #include <ignite/impl/ignite_environment.h>
@@ -47,13 +47,21 @@ namespace ignite
             enum Type
             {
                 CACHE_INVOKE = 8,
+                COMPUTE_TASK_JOB_RESULT = 10,
+                COMPUTE_TASK_REDUCE = 11,
+                COMPUTE_TASK_COMPLETE = 12,
+                COMPUTE_JOB_CREATE = 14,
+                COMPUTE_JOB_EXECUTE = 15,
+                COMPUTE_JOB_DESTROY = 17,
                 CONTINUOUS_QUERY_LISTENER_APPLY = 18,
                 CONTINUOUS_QUERY_FILTER_CREATE = 19,
                 CONTINUOUS_QUERY_FILTER_APPLY = 20,
                 CONTINUOUS_QUERY_FILTER_RELEASE = 21,
                 REALLOC = 36,
                 ON_START = 49,
-                ON_STOP = 50 
+                ON_STOP = 50,
+                COMPUTE_TASK_LOCAL_JOB_RESULT = 60,
+                COMPUTE_JOB_EXECUTE_LOCAL = 61
             };
         };
 
@@ -78,6 +86,47 @@ namespace ignite
                     break;
                 }
 
+                case OperationCallback::COMPUTE_JOB_CREATE:
+                {
+                    SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+                    res = env->Get()->ComputeJobCreate(mem);
+
+                    break;
+                }
+
+                case OperationCallback::COMPUTE_JOB_EXECUTE:
+                {
+                    SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+                    env->Get()->ComputeJobExecute(mem);
+
+                    break;
+                }
+
+                case OperationCallback::COMPUTE_JOB_DESTROY:
+                {
+                    env->Get()->ComputeJobDestroy(val);
+
+                    break;
+                }
+
+                case OperationCallback::COMPUTE_TASK_JOB_RESULT:
+                {
+                    SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
+
+                    res = env->Get()->ComputeTaskJobResult(mem);
+
+                    break;
+                }
+
+                case OperationCallback::COMPUTE_TASK_REDUCE:
+                {
+                    env->Get()->ComputeTaskReduce(val);
+
+                    break;
+                }
+
                 case OperationCallback::CONTINUOUS_QUERY_LISTENER_APPLY:
                 {
                     SharedPointer<InteropMemory> mem = env->Get()->GetMemory(val);
@@ -142,10 +191,32 @@ namespace ignite
         long long IGNITE_CALL InLongLongLongObjectOutLong(void* target, int type, long long val1, long long val2, 
             long long val3, void* arg)
         {
+            int64_t res = 0;
             SharedPointer<IgniteEnvironment>* env = static_cast<SharedPointer<IgniteEnvironment>*>(target);
 
             switch (type)
             {
+                case OperationCallback::COMPUTE_JOB_EXECUTE_LOCAL:
+                {
+                    env->Get()->ComputeJobExecuteLocal(val1);
+
+                    break;
+                }
+
+                case OperationCallback::COMPUTE_TASK_LOCAL_JOB_RESULT:
+                {
+                    res = env->Get()->ComputeTaskLocalJobResult(val1, val2);
+
+                    break;
+                }
+
+                case OperationCallback::COMPUTE_TASK_COMPLETE:
+                {
+                    env->Get()->ComputeTaskComplete(val1);
+
+                    break;
+                }
+
                 case OperationCallback::ON_START:
                 {
                     env->Get()->OnStartCallback(val1, reinterpret_cast<jobject>(arg));
@@ -168,7 +239,7 @@ namespace ignite
                 }
             }
 
-            return 0;
+            return res;
         }
 
         IgniteEnvironment::IgniteEnvironment(const IgniteConfiguration& cfg) :
@@ -300,6 +371,189 @@ namespace ignite
             return binding;
         }
 
+        jobject IgniteEnvironment::GetProcessorCompute(jobject proj)
+        {
+            JniErrorInfo jniErr;
+
+            jobject res = ctx.Get()->ProcessorCompute(proc.Get(), proj, &jniErr);
+
+            IgniteError err;
+
+            IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+
+            IgniteError::ThrowIfNeeded(err);
+
+            return res;
+        }
+
+        void IgniteEnvironment::ComputeJobExecuteLocal(int64_t jobHandle)
+        {
+            SharedPointer<compute::ComputeJobHolder> job0 =
+                StaticPointerCast<compute::ComputeJobHolder>(registry.Get(jobHandle));
+
+            compute::ComputeJobHolder* job = job0.Get();
+
+            if (job)
+                job->ExecuteLocal();
+            else
+            {
+                IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+                    "Job is not registred for handle", "jobHandle", jobHandle);
+            }
+        }
+
+        int32_t IgniteEnvironment::ComputeTaskLocalJobResult(int64_t taskHandle, int64_t jobHandle)
+        {
+            SharedPointer<compute::ComputeJobHolder> job0 =
+                StaticPointerCast<compute::ComputeJobHolder>(registry.Get(jobHandle));
+
+            compute::ComputeJobHolder* job = job0.Get();
+
+            SharedPointer<compute::ComputeTaskHolder> task0 =
+                StaticPointerCast<compute::ComputeTaskHolder>(registry.Get(taskHandle));
+
+            compute::ComputeTaskHolder* task = task0.Get();
+
+            if (task && job)
+                return task->JobResultLocal(*job);
+
+            if (!task)
+            {
+                IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+                    "Task is not registred for handle", "taskHandle", taskHandle);
+            }
+
+            IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+                "Job is not registred for handle", "jobHandle", jobHandle);
+        }
+
+        void IgniteEnvironment::ComputeTaskReduce(int64_t taskHandle)
+        {
+            SharedPointer<compute::ComputeTaskHolder> task0 =
+                StaticPointerCast<compute::ComputeTaskHolder>(registry.Get(taskHandle));
+
+            compute::ComputeTaskHolder* task = task0.Get();
+
+            if (task)
+                task->Reduce();
+            else
+            {
+                IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+                    "Task is not registred for handle", "taskHandle", taskHandle);
+            }
+        }
+
+        void IgniteEnvironment::ComputeTaskComplete(int64_t taskHandle)
+        {
+            SharedPointer<compute::ComputeTaskHolder> task0 =
+                StaticPointerCast<compute::ComputeTaskHolder>(registry.Get(taskHandle));
+
+            compute::ComputeTaskHolder* task = task0.Get();
+
+            if (task)
+            {
+                registry.Release(task->GetJobHandle());
+                registry.Release(taskHandle);
+            }
+        }
+
+        int64_t IgniteEnvironment::ComputeJobCreate(SharedPointer<InteropMemory>& mem)
+        {
+            if (!binding.Get())
+                throw IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "IgniteBinding is not initialized.");
+
+            InteropInputStream inStream(mem.Get());
+            BinaryReaderImpl reader(&inStream);
+
+            InteropOutputStream outStream(mem.Get());
+            BinaryWriterImpl writer(&outStream, GetTypeManager());
+
+            BinaryObjectImpl binJob = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position(), 0);
+
+            int32_t jobTypeId = binJob.GetTypeId();
+
+            bool invoked = false;
+
+            int64_t handle = binding.Get()->InvokeCallback(invoked,
+                IgniteBindingImpl::CallbackType::COMPUTE_JOB_CREATE, jobTypeId, reader, writer);
+
+            if (!invoked)
+            {
+                IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+                    "C++ compute job is not registered on the node (did you compile your program without -rdynamic?).",
+                    "jobTypeId", jobTypeId);
+            }
+
+            return handle;
+        }
+
+        void IgniteEnvironment::ComputeJobExecute(SharedPointer<InteropMemory>& mem)
+        {
+            InteropInputStream inStream(mem.Get());
+
+            InteropOutputStream outStream(mem.Get());
+            BinaryWriterImpl writer(&outStream, GetTypeManager());
+
+            int64_t jobHandle = inStream.ReadInt64();
+
+            SharedPointer<compute::ComputeJobHolder> job0 =
+                StaticPointerCast<compute::ComputeJobHolder>(registry.Get(jobHandle));
+
+            compute::ComputeJobHolder* job = job0.Get();
+
+            if (job)
+                job->ExecuteRemote(writer);
+            else
+            {
+                IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+                    "Job is not registred for handle", "jobHandle", jobHandle);
+            }
+
+            outStream.Synchronize();
+        }
+
+        void IgniteEnvironment::ComputeJobDestroy(int64_t jobHandle)
+        {
+            registry.Release(jobHandle);
+        }
+
+        int32_t IgniteEnvironment::ComputeTaskJobResult(SharedPointer<InteropMemory>& mem)
+        {
+            InteropInputStream inStream(mem.Get());
+            BinaryReaderImpl reader(&inStream);
+
+            int64_t taskHandle = reader.ReadInt64();
+            int64_t jobHandle = reader.ReadInt64();
+
+            // Node GUID
+            reader.ReadGuid();
+
+            // Cancel flag
+            reader.ReadBool();
+
+            SharedPointer<compute::ComputeJobHolder> job0 =
+                StaticPointerCast<compute::ComputeJobHolder>(registry.Get(jobHandle));
+
+            compute::ComputeJobHolder* job = job0.Get();
+
+            SharedPointer<compute::ComputeTaskHolder> task0 =
+                StaticPointerCast<compute::ComputeTaskHolder>(registry.Get(taskHandle));
+
+            compute::ComputeTaskHolder* task = task0.Get();
+
+            if (task && job)
+                return task->JobResultRemote(*job, reader);
+
+            if (!task)
+            {
+                IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+                    "Task is not registred for handle", "taskHandle", taskHandle);
+            }
+
+            IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION,
+                "Job is not registred for handle", "jobHandle", jobHandle);
+        }
+
         void IgniteEnvironment::ProcessorReleaseStart()
         {
             if (proc.Get())

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
index 546cd01..16e954c 100644
--- a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp
@@ -59,11 +59,18 @@ namespace ignite
             return env.Get()->Context();
         }
 
-        SharedPointer<IgniteBindingImpl> IgniteImpl::GetBinding()
+        IgniteImpl::SP_IgniteBindingImpl IgniteImpl::GetBinding()
         {
             return env.Get()->GetBinding();
         }
 
+        IgniteImpl::SP_ComputeImpl IgniteImpl::GetCompute()
+        {
+            cluster::SP_ClusterGroupImpl serversCluster = prjImpl.Get()->ForServers();
+
+            return serversCluster.Get()->GetCompute();
+        }
+
         IgniteImpl::SP_TransactionsImpl IgniteImpl::InternalGetTransactions(IgniteError &err)
         {
             SP_TransactionsImpl res;
@@ -80,16 +87,16 @@ namespace ignite
             return res;
         }
 
-        IgniteImpl::SP_ClusterGroupImpl IgniteImpl::InternalGetProjection(IgniteError& err)
+        cluster::SP_ClusterGroupImpl IgniteImpl::InternalGetProjection(IgniteError& err)
         {
-            SP_ClusterGroupImpl res;
+            cluster::SP_ClusterGroupImpl res;
 
             JniErrorInfo jniErr;
 
             jobject txJavaRef = env.Get()->Context()->ProcessorProjection(javaRef, &jniErr);
 
             if (txJavaRef)
-                res = SP_ClusterGroupImpl(new cluster::ClusterGroupImpl(env, txJavaRef));
+                res = cluster::SP_ClusterGroupImpl(new cluster::ClusterGroupImpl(env, txJavaRef));
             else
                 IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
index b0932e7..7eed6f3 100644
--- a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
+++ b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp
@@ -216,6 +216,26 @@ namespace ignite
                 return OperationResult::AI_ERROR;
             }
 
+            jobject InteropTarget::InStreamOutObject(int32_t opType, InteropMemory& outInMem)
+            {
+                JniErrorInfo jniErr;
+
+                int64_t outInPtr = outInMem.PointerLong();
+
+                if (outInPtr)
+                {
+                    jobject res = env.Get()->Context()->TargetInStreamOutObject(javaRef, opType, outInPtr, &jniErr);
+
+                    IgniteError err;
+                    IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
+                    IgniteError::ThrowIfNeeded(err);
+
+                    return res;
+                }
+
+                return 0;
+            }
+
             int64_t InteropTarget::OutInOpLong(int32_t opType, int64_t val, IgniteError& err)
             {
                 JniErrorInfo jniErr;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
index 133b375..b9e976a 100644
--- a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
+++ b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp
@@ -165,8 +165,6 @@ namespace ignite
 
             TransactionState::Type TransactionsImpl::TxCommit(int64_t id, IgniteError& err)
             {
-                JniErrorInfo jniErr;
-
                 int state = static_cast<int>(OutInOpLong(Operation::COMMIT, id, err));
 
                 return ToTransactionState(state);
@@ -174,8 +172,6 @@ namespace ignite
 
             TransactionState::Type TransactionsImpl::TxRollback(int64_t id, IgniteError& err)
             {
-                JniErrorInfo jniErr;
-
                 int state = static_cast<int>(OutInOpLong(Operation::ROLLBACK, id, err));
 
                 return ToTransactionState(state);
@@ -183,8 +179,6 @@ namespace ignite
 
             TransactionState::Type TransactionsImpl::TxClose(int64_t id, IgniteError& err)
             {
-                JniErrorInfo jniErr;
-
                 int state = static_cast<int>(OutInOpLong(Operation::CLOSE, id, err));
 
                 return ToTransactionState(state);
@@ -192,8 +186,6 @@ namespace ignite
 
             bool TransactionsImpl::TxSetRollbackOnly(int64_t id, IgniteError& err)
             {
-                JniErrorInfo jniErr;
-
                 bool rollbackOnly = OutInOpLong(Operation::SET_ROLLBACK_ONLY, id, err) == 1;
 
                 return rollbackOnly;
@@ -201,8 +193,6 @@ namespace ignite
 
             TransactionState::Type TransactionsImpl::TxState(int64_t id, IgniteError& err)
             {
-                JniErrorInfo jniErr;
-
                 int state = static_cast<int>(OutInOpLong(Operation::STATE, id, err));
 
                 return ToTransactionState(state);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/jni/include/ignite/jni/java.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h
index 85955b3..f6d7207 100644
--- a/modules/platforms/cpp/jni/include/ignite/jni/java.h
+++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h
@@ -365,6 +365,7 @@ namespace ignite
                 jobject ProcessorDataStreamer(jobject obj, const char* name, bool keepPortable);
                 jobject ProcessorTransactions(jobject obj, JniErrorInfo* errInfo = NULL);
                 jobject ProcessorCompute(jobject obj, jobject prj);
+                jobject ProcessorCompute(jobject obj, jobject prj, JniErrorInfo* errInfo);
                 jobject ProcessorMessage(jobject obj, jobject prj);
                 jobject ProcessorEvents(jobject obj, jobject prj);
                 jobject ProcessorServices(jobject obj, jobject prj);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/jni/src/java.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp
index 809aa17..bc6af34 100644
--- a/modules/platforms/cpp/jni/src/java.cpp
+++ b/modules/platforms/cpp/jni/src/java.cpp
@@ -1154,6 +1154,16 @@ namespace ignite
                 return LocalToGlobal(env, res);
             }
 
+            jobject JniContext::ProcessorCompute(jobject obj, jobject prj, JniErrorInfo* errInfo) {
+                JNIEnv* env = Attach();
+
+                jobject res = env->CallObjectMethod(obj, jvm->GetMembers().m_PlatformProcessor_compute, prj);
+
+                ExceptionCheck(env, errInfo);
+
+                return LocalToGlobal(env, res);
+            }
+
             jobject JniContext::ProcessorMessage(jobject obj, jobject prj) {
                 JNIEnv* env = Attach();
 


Mime
View raw message