Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C81C1200C84 for ; Mon, 29 May 2017 16:30:49 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C6C84160BC2; Mon, 29 May 2017 14:30:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EC989160BCE for ; Mon, 29 May 2017 16:30:47 +0200 (CEST) Received: (qmail 60093 invoked by uid 500); 29 May 2017 14:30:47 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 60083 invoked by uid 99); 29 May 2017 14:30:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 May 2017 14:30:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 07A79DFF66; Mon, 29 May 2017 14:30:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: isapego@apache.org To: commits@ignite.apache.org Date: Mon, 29 May 2017 14:30:47 -0000 Message-Id: <8304d0ae1f914c138d64675c4df395c8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] ignite git commit: IGNITE-3355: Implemented Compute::Call() for C++ archived-at: Mon, 29 May 2017 14:30:50 -0000 Repository: ignite Updated Branches: refs/heads/master 0f8af137c -> f9c96de57 http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h new file mode 100644 index 0000000..e218e36 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_holder.h @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::impl::compute::ComputeJobHolder class template. + */ + +#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_HOLDER +#define _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_HOLDER + +#include +#include + +namespace ignite +{ + namespace impl + { + namespace compute + { + /** + * Compute job holder. Internal helper class. + * Used to handle jobs in general way, without specific types. + */ + class ComputeJobHolder + { + public: + /** + * Destructor. + */ + virtual ~ComputeJobHolder() + { + // No-op. + } + + /** + * Execute job locally. + */ + virtual void ExecuteLocal() = 0; + + /** + * Execute job remote. + * + * @param writer Writer. + */ + virtual void ExecuteRemote(binary::BinaryWriterImpl& writer) = 0; + }; + + /** + * Compute job holder. Internal class. + * + * @tparam F Actual job type. + * @tparam R Job return type. + */ + template + class ComputeJobHolderImpl : public ComputeJobHolder + { + public: + typedef R ResultType; + typedef F JobType; + + /** + * Constructor. + * + * @param job Job. + */ + ComputeJobHolderImpl(JobType job) : + job(job) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~ComputeJobHolderImpl() + { + // No-op. + } + + const ComputeJobResult& GetResult() + { + return res; + } + + virtual void ExecuteLocal() + { + try + { + res.SetResult(job.Call()); + } + catch (const IgniteError& err) + { + res.SetError(err); + } + catch (const std::exception& err) + { + res.SetError(IgniteError(IgniteError::IGNITE_ERR_STD, err.what())); + } + catch (...) + { + res.SetError(IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, + "Unknown error occurred during call.")); + } + } + + virtual void ExecuteRemote(binary::BinaryWriterImpl& writer) + { + ExecuteLocal(); + + res.Write(writer); + } + + private: + /** Result. */ + ComputeJobResult res; + + /** Job. */ + JobType job; + }; + } + } +} + +#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_JOB_HOLDER http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h new file mode 100644 index 0000000..5bcb762 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::impl::compute::ComputeJobResult class template. + */ + +#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_RESULT +#define _IGNITE_IMPL_COMPUTE_COMPUTE_JOB_RESULT + +#include +#include + +#include + +namespace ignite +{ + namespace impl + { + namespace compute + { + /** + * Used to hold compute job result. + */ + template + class ComputeJobResult + { + public: + typedef R ResultType; + /** + * Default constructor. + */ + ComputeJobResult() : + res(), + err() + { + // No-op. + } + + /** + * Set result value. + * + * @param val Value to set as a result. + */ + void SetResult(const ResultType& val) + { + res = val; + } + + /** + * Set error. + * + * @param error Error to set. + */ + void SetError(const IgniteError error) + { + err = error; + } + + /** + * Set promise to a state which corresponds to result. + * + * @param promise Promise, which state to set. + */ + void SetPromise(common::Promise& promise) + { + if (err.GetCode() != IgniteError::IGNITE_SUCCESS) + promise.SetError(err); + else + promise.SetValue(std::auto_ptr(new ResultType(res))); + } + + /** + * Write using writer. + * + * @param writer Writer. + */ + void Write(binary::BinaryWriterImpl& writer) + { + if (err.GetCode() != IgniteError::IGNITE_SUCCESS) + { + // Fail + writer.WriteBool(false); + + // Native Exception + writer.WriteBool(true); + + writer.WriteObject(err); + } + else + { + // Success + writer.WriteBool(true); + + writer.WriteObject(res); + } + } + + /** + * Read using reader. + * + * @param reader Reader. + */ + void Read(binary::BinaryReaderImpl& reader) + { + bool success = reader.ReadBool(); + + if (success) + { + res = reader.ReadObject(); + + err = IgniteError(); + } + else + { + bool native = reader.ReadBool(); + + if (native) + err = reader.ReadObject(); + else + { + std::stringstream buf; + + buf << reader.ReadObject() << " : "; + buf << reader.ReadObject() << ", "; + buf << reader.ReadObject(); + + std::string msg = buf.str(); + + err = IgniteError(IgniteError::IGNITE_ERR_GENERIC, msg.c_str()); + } + } + } + + private: + /** Result. */ + ResultType res; + + /** Erorr. */ + IgniteError err; + }; + } + } +} + +#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_JOB_RESULT http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h new file mode 100644 index 0000000..bdd7513 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::impl::compute::ComputeTaskHolder class and + * ignite::impl::compute::ComputeTaskHolderImpl class template. + */ + +#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL +#define _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL + +#include + +#include +#include +#include + +namespace ignite +{ + namespace impl + { + namespace compute + { + struct ComputeJobResultPolicy + { + enum Type + { + /** + * Wait for results if any are still expected. If all results have been received - + * it will start reducing results. + */ + WAIT = 0, + + /** + * Ignore all not yet received results and start reducing results. + */ + REDUCE = 1, + + /** + * Fail-over job to execute on another node. + */ + FAILOVER = 2 + }; + }; + + /** + * Compute task holder. Internal helper class. + * Used to handle tasks in general way, without specific types. + */ + class ComputeTaskHolder + { + public: + /** + * Constructor. + * + * @param handle Job handle. + */ + ComputeTaskHolder(int64_t handle) : + handle(handle) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~ComputeTaskHolder() + { + // No-op. + } + + /** + * Process local job result. + * + * @param job Job. + * @return Policy. + */ + virtual int32_t JobResultLocal(ComputeJobHolder& job) = 0; + + /** + * Process remote job result. + * + * @param job Job. + * @param reader Reader for stream with result. + * @return Policy. + */ + virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) = 0; + + /** + * Reduce results of related jobs. + */ + virtual void Reduce() = 0; + + /** + * Get related job handle. + * + * @return Job handle. + */ + int64_t GetJobHandle() + { + return handle; + } + + private: + /** Related job handle. */ + int64_t handle; + }; + + /** + * Compute task holder type-specific implementation. + */ + template + class ComputeTaskHolderImpl : public ComputeTaskHolder + { + public: + typedef F JobType; + typedef R ResultType; + + /** + * Constructor. + * + * @param handle Job handle. + */ + ComputeTaskHolderImpl(int64_t handle) : + ComputeTaskHolder(handle) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~ComputeTaskHolderImpl() + { + // No-op. + } + + /** + * Process local job result. + * + * @param job Job. + * @return Policy. + */ + virtual int32_t JobResultLocal(ComputeJobHolder& job) + { + typedef ComputeJobHolderImpl ActualComputeJobHolder; + + ActualComputeJobHolder& job0 = static_cast(job); + + res = job0.GetResult(); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Process remote job result. + * + * @param job Job. + * @param reader Reader for stream with result. + * @return Policy. + */ + virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) + { + res.Read(reader); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Reduce results of related jobs. + */ + virtual void Reduce() + { + res.SetPromise(promise); + } + + /** + * Get result promise. + * + * @return Reference to result promise. + */ + common::Promise& GetPromise() + { + return promise; + } + + private: + /** Result. */ + ComputeJobResult res; + + /** Task result promise. */ + common::Promise promise; + }; + } + } +} + +#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h index a99855a..d0de432 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h @@ -52,6 +52,8 @@ namespace ignite CACHE_ENTRY_FILTER_CREATE = 2, CACHE_ENTRY_FILTER_APPLY = 3, + + COMPUTE_JOB_CREATE = 4, }; }; http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h index e3cb859..13f7b80 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h +++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h @@ -209,6 +209,75 @@ namespace ignite */ common::concurrent::SharedPointer GetBinding() const; + /** + * Get processor compute. + * + * @param proj Projection. + * @return Processor compute. + */ + jobject GetProcessorCompute(jobject proj); + + /** + * Locally execute compute job. + * + * @param jobHandle Job handle. + */ + void ComputeJobExecuteLocal(int64_t jobHandle); + + /** + * Locally commit job execution result for the task. + * + * @param taskHandle Task handle. + * @param jobHandle Job handle. + * @return Reduce politics. + */ + int32_t ComputeTaskLocalJobResult(int64_t taskHandle, int64_t jobHandle); + + /** + * Reduce compute task. + * + * @param taskHandle Task handle. + */ + void ComputeTaskReduce(int64_t taskHandle); + + /** + * Complete compute task. + * + * @param taskHandle Task handle. + */ + void ComputeTaskComplete(int64_t taskHandle); + + /** + * Create compute job. + * + * @param mem Memory. + * @return Job handle. + */ + int64_t ComputeJobCreate(common::concurrent::SharedPointer& mem); + + /** + * Execute compute job. + * + * @param mem Memory. + * @return Job handle. + */ + void ComputeJobExecute(common::concurrent::SharedPointer& mem); + + /** + * Destroy compute job. + * + * @param jobHandle Job handle to destroy. + */ + void ComputeJobDestroy(int64_t jobHandle); + + /** + * Consume result of remote job execution. + * + * @param mem Memory containing result. + * @return Reduce policy. + */ + int32_t ComputeTaskJobResult(common::concurrent::SharedPointer& mem); + private: /** Node configuration. */ IgniteConfiguration* cfg; http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h index 5b1f527..baddec4 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h @@ -22,10 +22,11 @@ #include #include +#include #include #include #include -#include +#include namespace ignite { @@ -38,7 +39,8 @@ namespace ignite { typedef common::concurrent::SharedPointer SP_IgniteEnvironment; typedef common::concurrent::SharedPointer SP_TransactionsImpl; - typedef common::concurrent::SharedPointer SP_ClusterGroupImpl; + typedef common::concurrent::SharedPointer SP_ComputeImpl; + typedef common::concurrent::SharedPointer SP_IgniteBindingImpl; public: /** * Constructor used to create new instance. @@ -154,7 +156,7 @@ namespace ignite * * @return IgniteBinding class instance. */ - common::concurrent::SharedPointer GetBinding(); + SP_IgniteBindingImpl GetBinding(); /** * Get instance of the implementation from the proxy class. @@ -185,7 +187,7 @@ namespace ignite * * @return TransactionsImpl instance. */ - SP_TransactionsImpl GetTransactions() const + SP_TransactionsImpl GetTransactions() { return txImpl; } @@ -195,11 +197,18 @@ namespace ignite * * @return ClusterGroupImpl instance. */ - SP_ClusterGroupImpl GetProjection() const + cluster::SP_ClusterGroupImpl GetProjection() { return prjImpl; } + /** + * Get compute. + * + * @return ComputeImpl instance. + */ + SP_ComputeImpl GetCompute(); + private: /** * Get transactions internal call. @@ -213,7 +222,7 @@ namespace ignite * * @return ClusterGroupImpl instance. */ - SP_ClusterGroupImpl InternalGetProjection(IgniteError &err); + cluster::SP_ClusterGroupImpl InternalGetProjection(IgniteError &err); /** Environment. */ SP_IgniteEnvironment env; @@ -225,7 +234,7 @@ namespace ignite SP_TransactionsImpl txImpl; /** Projection implementation. */ - SP_ClusterGroupImpl prjImpl; + cluster::SP_ClusterGroupImpl prjImpl; IGNITE_NO_COPY_ASSIGNMENT(IgniteImpl) }; http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h index f9b2b7f..0384dcc 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h +++ b/modules/platforms/cpp/core/include/ignite/impl/interop/interop_target.h @@ -62,7 +62,7 @@ namespace ignite /** * Destructor. */ - ~InteropTarget(); + virtual ~InteropTarget(); /** * Internal out operation. @@ -135,6 +135,15 @@ namespace ignite OperationResult::Type InStreamOutLong(int32_t opType, InteropMemory& outInMem, IgniteError& err); /** + * In stream out object operation. + * + * @param opType Type of operation. + * @param outInMem Input and output memory. + * @return Java object references. + */ + jobject InStreamOutObject(int32_t opType, InteropMemory& outInMem); + + /** * Internal out-in operation. * * @param opType Operation type. http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/project/vs/core.vcxproj ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj index 5cd49f3..9911ffe 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj @@ -208,6 +208,8 @@ + + @@ -225,6 +227,11 @@ + + + + + @@ -251,6 +258,8 @@ + + http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/project/vs/core.vcxproj.filters ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters index 98099a9..7b84494 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters @@ -55,6 +55,12 @@ Code\impl + + Code\impl\compute + + + Code\impl\compute + @@ -210,6 +216,27 @@ Code\impl + + Code\compute + + + Code\impl\compute + + + Code\compute + + + Code\impl\compute + + + Code\impl\compute + + + Code\impl\compute + + + Code\impl\compute + @@ -257,5 +284,11 @@ {9c5e9732-755a-4553-8926-b4cf3b6abaf3} + + {f1b7ced1-0e6e-4e07-a5b6-04b076797c6f} + + + {ef20cfe1-cd30-429d-a241-575696df8399} + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/ignite.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/ignite.cpp b/modules/platforms/cpp/core/src/ignite.cpp index 2665916..9c42f1d 100644 --- a/modules/platforms/cpp/core/src/ignite.cpp +++ b/modules/platforms/cpp/core/src/ignite.cpp @@ -55,6 +55,11 @@ namespace ignite return transactions::Transactions(txImpl); } + compute::Compute Ignite::GetCompute() + { + return compute::Compute(impl.Get()->GetCompute()); + } + IgniteBinding Ignite::GetBinding() { return impl.Get()->GetBinding(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp index 1bddeac..c34e828 100644 --- a/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/cluster/cluster_group_impl.cpp @@ -37,7 +37,7 @@ namespace ignite ClusterGroupImpl::ClusterGroupImpl(SP_IgniteEnvironment env, jobject javaRef) : InteropTarget(env, javaRef) { - // No-op. + computeImpl = InternalGetCompute(); } ClusterGroupImpl::~ClusterGroupImpl() @@ -45,22 +45,33 @@ namespace ignite // No-op. } - ClusterGroupImpl::SP_ClusterGroupImpl ClusterGroupImpl::ForServers(IgniteError& err) + SP_ClusterGroupImpl ClusterGroupImpl::ForServers() { - JniErrorInfo jniErr; + IgniteError err; jobject res = InOpObject(Command::FOR_SERVERS, err); - if (jniErr.code != java::IGNITE_JNI_ERR_SUCCESS) - return SP_ClusterGroupImpl(); + IgniteError::ThrowIfNeeded(err); return FromTarget(res); } - ClusterGroupImpl::SP_ClusterGroupImpl ClusterGroupImpl::FromTarget(jobject javaRef) + ClusterGroupImpl::SP_ComputeImpl ClusterGroupImpl::GetCompute() + { + return computeImpl; + } + + SP_ClusterGroupImpl ClusterGroupImpl::FromTarget(jobject javaRef) { return SP_ClusterGroupImpl(new ClusterGroupImpl(GetEnvironmentPointer(), javaRef)); } + + ClusterGroupImpl::SP_ComputeImpl ClusterGroupImpl::InternalGetCompute() + { + jobject computeProc = GetEnvironment().GetProcessorCompute(GetTarget()); + + return SP_ComputeImpl(new compute::ComputeImpl(GetEnvironmentPointer(), computeProc)); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp b/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp new file mode 100644 index 0000000..6e61cc8 --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/compute/cancelable_impl.cpp @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +using namespace ignite::common::concurrent; + +namespace +{ + /** + * Operation type. + */ + struct Operation + { + enum Type + { + Cancel = 1 + }; + }; +} + +namespace ignite +{ + namespace impl + { + namespace compute + { + CancelableImpl::CancelableImpl(SharedPointer env, jobject javaRef) : + InteropTarget(env, javaRef), + Cancelable() + { + // No-op. + } + + void CancelableImpl::Cancel() + { + IgniteError err; + + OutInOpLong(Operation::Cancel, 0, err); + + IgniteError::ThrowIfNeeded(err); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp b/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp new file mode 100644 index 0000000..591dd1f --- /dev/null +++ b/modules/platforms/cpp/core/src/impl/compute/compute_impl.cpp @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +using namespace ignite::common::concurrent; + +namespace ignite +{ + namespace impl + { + namespace compute + { + ComputeImpl::ComputeImpl(SharedPointer env, jobject javaRef) : + InteropTarget(env, javaRef) + { + // No-op. + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/ignite_environment.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp index 2231003..4e78f09 100644 --- a/modules/platforms/cpp/core/src/impl/ignite_environment.cpp +++ b/modules/platforms/cpp/core/src/impl/ignite_environment.cpp @@ -20,10 +20,10 @@ #include #include #include +#include #include #include -#include #include #include @@ -47,13 +47,21 @@ namespace ignite enum Type { CACHE_INVOKE = 8, + COMPUTE_TASK_JOB_RESULT = 10, + COMPUTE_TASK_REDUCE = 11, + COMPUTE_TASK_COMPLETE = 12, + COMPUTE_JOB_CREATE = 14, + COMPUTE_JOB_EXECUTE = 15, + COMPUTE_JOB_DESTROY = 17, CONTINUOUS_QUERY_LISTENER_APPLY = 18, CONTINUOUS_QUERY_FILTER_CREATE = 19, CONTINUOUS_QUERY_FILTER_APPLY = 20, CONTINUOUS_QUERY_FILTER_RELEASE = 21, REALLOC = 36, ON_START = 49, - ON_STOP = 50 + ON_STOP = 50, + COMPUTE_TASK_LOCAL_JOB_RESULT = 60, + COMPUTE_JOB_EXECUTE_LOCAL = 61 }; }; @@ -78,6 +86,47 @@ namespace ignite break; } + case OperationCallback::COMPUTE_JOB_CREATE: + { + SharedPointer mem = env->Get()->GetMemory(val); + + res = env->Get()->ComputeJobCreate(mem); + + break; + } + + case OperationCallback::COMPUTE_JOB_EXECUTE: + { + SharedPointer mem = env->Get()->GetMemory(val); + + env->Get()->ComputeJobExecute(mem); + + break; + } + + case OperationCallback::COMPUTE_JOB_DESTROY: + { + env->Get()->ComputeJobDestroy(val); + + break; + } + + case OperationCallback::COMPUTE_TASK_JOB_RESULT: + { + SharedPointer mem = env->Get()->GetMemory(val); + + res = env->Get()->ComputeTaskJobResult(mem); + + break; + } + + case OperationCallback::COMPUTE_TASK_REDUCE: + { + env->Get()->ComputeTaskReduce(val); + + break; + } + case OperationCallback::CONTINUOUS_QUERY_LISTENER_APPLY: { SharedPointer mem = env->Get()->GetMemory(val); @@ -142,10 +191,32 @@ namespace ignite long long IGNITE_CALL InLongLongLongObjectOutLong(void* target, int type, long long val1, long long val2, long long val3, void* arg) { + int64_t res = 0; SharedPointer* env = static_cast*>(target); switch (type) { + case OperationCallback::COMPUTE_JOB_EXECUTE_LOCAL: + { + env->Get()->ComputeJobExecuteLocal(val1); + + break; + } + + case OperationCallback::COMPUTE_TASK_LOCAL_JOB_RESULT: + { + res = env->Get()->ComputeTaskLocalJobResult(val1, val2); + + break; + } + + case OperationCallback::COMPUTE_TASK_COMPLETE: + { + env->Get()->ComputeTaskComplete(val1); + + break; + } + case OperationCallback::ON_START: { env->Get()->OnStartCallback(val1, reinterpret_cast(arg)); @@ -168,7 +239,7 @@ namespace ignite } } - return 0; + return res; } IgniteEnvironment::IgniteEnvironment(const IgniteConfiguration& cfg) : @@ -300,6 +371,189 @@ namespace ignite return binding; } + jobject IgniteEnvironment::GetProcessorCompute(jobject proj) + { + JniErrorInfo jniErr; + + jobject res = ctx.Get()->ProcessorCompute(proc.Get(), proj, &jniErr); + + IgniteError err; + + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + + IgniteError::ThrowIfNeeded(err); + + return res; + } + + void IgniteEnvironment::ComputeJobExecuteLocal(int64_t jobHandle) + { + SharedPointer job0 = + StaticPointerCast(registry.Get(jobHandle)); + + compute::ComputeJobHolder* job = job0.Get(); + + if (job) + job->ExecuteLocal(); + else + { + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, + "Job is not registred for handle", "jobHandle", jobHandle); + } + } + + int32_t IgniteEnvironment::ComputeTaskLocalJobResult(int64_t taskHandle, int64_t jobHandle) + { + SharedPointer job0 = + StaticPointerCast(registry.Get(jobHandle)); + + compute::ComputeJobHolder* job = job0.Get(); + + SharedPointer task0 = + StaticPointerCast(registry.Get(taskHandle)); + + compute::ComputeTaskHolder* task = task0.Get(); + + if (task && job) + return task->JobResultLocal(*job); + + if (!task) + { + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, + "Task is not registred for handle", "taskHandle", taskHandle); + } + + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, + "Job is not registred for handle", "jobHandle", jobHandle); + } + + void IgniteEnvironment::ComputeTaskReduce(int64_t taskHandle) + { + SharedPointer task0 = + StaticPointerCast(registry.Get(taskHandle)); + + compute::ComputeTaskHolder* task = task0.Get(); + + if (task) + task->Reduce(); + else + { + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, + "Task is not registred for handle", "taskHandle", taskHandle); + } + } + + void IgniteEnvironment::ComputeTaskComplete(int64_t taskHandle) + { + SharedPointer task0 = + StaticPointerCast(registry.Get(taskHandle)); + + compute::ComputeTaskHolder* task = task0.Get(); + + if (task) + { + registry.Release(task->GetJobHandle()); + registry.Release(taskHandle); + } + } + + int64_t IgniteEnvironment::ComputeJobCreate(SharedPointer& mem) + { + if (!binding.Get()) + throw IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "IgniteBinding is not initialized."); + + InteropInputStream inStream(mem.Get()); + BinaryReaderImpl reader(&inStream); + + InteropOutputStream outStream(mem.Get()); + BinaryWriterImpl writer(&outStream, GetTypeManager()); + + BinaryObjectImpl binJob = BinaryObjectImpl::FromMemory(*mem.Get(), inStream.Position(), 0); + + int32_t jobTypeId = binJob.GetTypeId(); + + bool invoked = false; + + int64_t handle = binding.Get()->InvokeCallback(invoked, + IgniteBindingImpl::CallbackType::COMPUTE_JOB_CREATE, jobTypeId, reader, writer); + + if (!invoked) + { + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, + "C++ compute job is not registered on the node (did you compile your program without -rdynamic?).", + "jobTypeId", jobTypeId); + } + + return handle; + } + + void IgniteEnvironment::ComputeJobExecute(SharedPointer& mem) + { + InteropInputStream inStream(mem.Get()); + + InteropOutputStream outStream(mem.Get()); + BinaryWriterImpl writer(&outStream, GetTypeManager()); + + int64_t jobHandle = inStream.ReadInt64(); + + SharedPointer job0 = + StaticPointerCast(registry.Get(jobHandle)); + + compute::ComputeJobHolder* job = job0.Get(); + + if (job) + job->ExecuteRemote(writer); + else + { + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, + "Job is not registred for handle", "jobHandle", jobHandle); + } + + outStream.Synchronize(); + } + + void IgniteEnvironment::ComputeJobDestroy(int64_t jobHandle) + { + registry.Release(jobHandle); + } + + int32_t IgniteEnvironment::ComputeTaskJobResult(SharedPointer& mem) + { + InteropInputStream inStream(mem.Get()); + BinaryReaderImpl reader(&inStream); + + int64_t taskHandle = reader.ReadInt64(); + int64_t jobHandle = reader.ReadInt64(); + + // Node GUID + reader.ReadGuid(); + + // Cancel flag + reader.ReadBool(); + + SharedPointer job0 = + StaticPointerCast(registry.Get(jobHandle)); + + compute::ComputeJobHolder* job = job0.Get(); + + SharedPointer task0 = + StaticPointerCast(registry.Get(taskHandle)); + + compute::ComputeTaskHolder* task = task0.Get(); + + if (task && job) + return task->JobResultRemote(*job, reader); + + if (!task) + { + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, + "Task is not registred for handle", "taskHandle", taskHandle); + } + + IGNITE_ERROR_FORMATTED_1(IgniteError::IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, + "Job is not registred for handle", "jobHandle", jobHandle); + } + void IgniteEnvironment::ProcessorReleaseStart() { if (proc.Get()) http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/ignite_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp index 546cd01..16e954c 100644 --- a/modules/platforms/cpp/core/src/impl/ignite_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/ignite_impl.cpp @@ -59,11 +59,18 @@ namespace ignite return env.Get()->Context(); } - SharedPointer IgniteImpl::GetBinding() + IgniteImpl::SP_IgniteBindingImpl IgniteImpl::GetBinding() { return env.Get()->GetBinding(); } + IgniteImpl::SP_ComputeImpl IgniteImpl::GetCompute() + { + cluster::SP_ClusterGroupImpl serversCluster = prjImpl.Get()->ForServers(); + + return serversCluster.Get()->GetCompute(); + } + IgniteImpl::SP_TransactionsImpl IgniteImpl::InternalGetTransactions(IgniteError &err) { SP_TransactionsImpl res; @@ -80,16 +87,16 @@ namespace ignite return res; } - IgniteImpl::SP_ClusterGroupImpl IgniteImpl::InternalGetProjection(IgniteError& err) + cluster::SP_ClusterGroupImpl IgniteImpl::InternalGetProjection(IgniteError& err) { - SP_ClusterGroupImpl res; + cluster::SP_ClusterGroupImpl res; JniErrorInfo jniErr; jobject txJavaRef = env.Get()->Context()->ProcessorProjection(javaRef, &jniErr); if (txJavaRef) - res = SP_ClusterGroupImpl(new cluster::ClusterGroupImpl(env, txJavaRef)); + res = cluster::SP_ClusterGroupImpl(new cluster::ClusterGroupImpl(env, txJavaRef)); else IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp index b0932e7..7eed6f3 100644 --- a/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp +++ b/modules/platforms/cpp/core/src/impl/interop/interop_target.cpp @@ -216,6 +216,26 @@ namespace ignite return OperationResult::AI_ERROR; } + jobject InteropTarget::InStreamOutObject(int32_t opType, InteropMemory& outInMem) + { + JniErrorInfo jniErr; + + int64_t outInPtr = outInMem.PointerLong(); + + if (outInPtr) + { + jobject res = env.Get()->Context()->TargetInStreamOutObject(javaRef, opType, outInPtr, &jniErr); + + IgniteError err; + IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err); + IgniteError::ThrowIfNeeded(err); + + return res; + } + + return 0; + } + int64_t InteropTarget::OutInOpLong(int32_t opType, int64_t val, IgniteError& err) { JniErrorInfo jniErr; http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp index 133b375..b9e976a 100644 --- a/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp +++ b/modules/platforms/cpp/core/src/impl/transactions/transactions_impl.cpp @@ -165,8 +165,6 @@ namespace ignite TransactionState::Type TransactionsImpl::TxCommit(int64_t id, IgniteError& err) { - JniErrorInfo jniErr; - int state = static_cast(OutInOpLong(Operation::COMMIT, id, err)); return ToTransactionState(state); @@ -174,8 +172,6 @@ namespace ignite TransactionState::Type TransactionsImpl::TxRollback(int64_t id, IgniteError& err) { - JniErrorInfo jniErr; - int state = static_cast(OutInOpLong(Operation::ROLLBACK, id, err)); return ToTransactionState(state); @@ -183,8 +179,6 @@ namespace ignite TransactionState::Type TransactionsImpl::TxClose(int64_t id, IgniteError& err) { - JniErrorInfo jniErr; - int state = static_cast(OutInOpLong(Operation::CLOSE, id, err)); return ToTransactionState(state); @@ -192,8 +186,6 @@ namespace ignite bool TransactionsImpl::TxSetRollbackOnly(int64_t id, IgniteError& err) { - JniErrorInfo jniErr; - bool rollbackOnly = OutInOpLong(Operation::SET_ROLLBACK_ONLY, id, err) == 1; return rollbackOnly; @@ -201,8 +193,6 @@ namespace ignite TransactionState::Type TransactionsImpl::TxState(int64_t id, IgniteError& err) { - JniErrorInfo jniErr; - int state = static_cast(OutInOpLong(Operation::STATE, id, err)); return ToTransactionState(state); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/jni/include/ignite/jni/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h index 85955b3..f6d7207 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/java.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h @@ -365,6 +365,7 @@ namespace ignite jobject ProcessorDataStreamer(jobject obj, const char* name, bool keepPortable); jobject ProcessorTransactions(jobject obj, JniErrorInfo* errInfo = NULL); jobject ProcessorCompute(jobject obj, jobject prj); + jobject ProcessorCompute(jobject obj, jobject prj, JniErrorInfo* errInfo); jobject ProcessorMessage(jobject obj, jobject prj); jobject ProcessorEvents(jobject obj, jobject prj); jobject ProcessorServices(jobject obj, jobject prj); http://git-wip-us.apache.org/repos/asf/ignite/blob/f9c96de5/modules/platforms/cpp/jni/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp index 809aa17..bc6af34 100644 --- a/modules/platforms/cpp/jni/src/java.cpp +++ b/modules/platforms/cpp/jni/src/java.cpp @@ -1154,6 +1154,16 @@ namespace ignite return LocalToGlobal(env, res); } + jobject JniContext::ProcessorCompute(jobject obj, jobject prj, JniErrorInfo* errInfo) { + JNIEnv* env = Attach(); + + jobject res = env->CallObjectMethod(obj, jvm->GetMembers().m_PlatformProcessor_compute, prj); + + ExceptionCheck(env, errInfo); + + return LocalToGlobal(env, res); + } + jobject JniContext::ProcessorMessage(jobject obj, jobject prj) { JNIEnv* env = Attach();